Serge Rielau created SPARK-56939:
------------------------------------

             Summary: Resolve deadlock between USE and function lookup
                 Key: SPARK-56939
                 URL: https://issues.apache.org/jira/browse/SPARK-56939
             Project: Spark
          Issue Type: Sub-task
          Components: Spark Core
    Affects Versions: 4.2.0
            Reporter: Serge Rielau


h1. Summary

{{{}SessionCatalog{}}}/{{{}CatalogManager{}}} lock-order inversion can deadlock 
concurrent {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} and unqualified function 
resolution

Issue Type: Bug Component(s): SQL Affects Version/s: 4.2.0 (introduced by 
SPARK-56750) Priority: Major Labels: correctness, concurrency
----
h1. Description

After SPARK-56750 wired {{CatalogManager}} into {{SessionCatalog}} as the live 
source for the path-driven session function kinds, two 
{{{}SessionCatalog{}}}/{{{}CatalogManager{}}} code paths form a lock-order 
inversion that can deadlock when both run on different threads of the same 
{{{}SparkSession{}}}.
h2. The cycle

Arm 1: {{SessionCatalog.synchronized}} → {{CatalogManager.synchronized}}

{{SessionCatalog.lookupFunctionInfo}} (still {{synchronized}} on the catalog) 
runs unqualified function resolution, which evaluates 
{{{}sessionFunctionKindsInResolutionOrder{}}}. With the live CatalogManager 
bound (production), that derivation needs the catalog manager's lock to read 
{{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the session path:
private def sessionFunctionKindsInResolutionOrder: 
Seq[SessionCatalog.SessionFunctionKind] =
sessionFunctionKindsTestOverride.getOrElse {
catalogManagerForSessionFunctionKinds match {
caseSome(cm) =>
CatalogManager.systemFunctionKindsFromPath(
cm.sqlResolutionPathEntries(cm.currentCatalog.name(), 
cm.currentNamespace.toSeq))
caseNone=>
CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
}
}
 
So: lookup acquires the {{SessionCatalog}} intrinsic lock, then needs the 
{{CatalogManager}} intrinsic lock.

Arm 2: {{CatalogManager.synchronized}} → {{SessionCatalog.synchronized}}

{{{}CatalogManager.setCurrentNamespace{}}}, {{{}setCurrentCatalog{}}}, 
{{{}setCurrentCatalogAndNamespace{}}}, and {{setCurrentCatalogWithoutCheck}} 
all hold {{CatalogManager.synchronized}} and then call back into 
{{{}v1SessionCatalog.setCurrentDatabase*{}}}, which re-acquires 
{{{}SessionCatalog{}}}:
def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
if (isSessionCatalog(currentCatalog) && namespace.length ==1) {
v1SessionCatalog.setCurrentDatabaseWithNameCheck(
namespace.head,
_ => assertNamespaceExist(namespace))
}else{
assertNamespaceExist(namespace)
}
_currentNamespace = Some(namespace)
}
So: {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} acquire the {{CatalogManager}} lock, 
then need the {{SessionCatalog}} lock.
h2. Triggering conditions

The deadlock requires two threads sharing one {{SparkSession}} such that:
 # One thread is executing any SQL that contains an unqualified function 
reference (extremely common) so it runs {{{}lookupFunctionInfo{}}}.
 # The other thread is executing {{{}USE SCHEMA …{}}}, {{{}USE CATALOG …{}}}, 
or {{USE catalog.schema}} so it runs the matching 
{{{}setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}} path.

Both can also be the _same_ thread interleaved across futures if any rule 
invoked from inside an analyzer scope triggers a {{{}USE{}}}-equivalent.
h2. Independence from {{spark.sql.functionResolution.sessionOrder}}

The hazard is independent of the function-resolution order setting 
({{{}first{}}} / {{second}} / {{{}last{}}}):
 * Arm 1 acquires {{CatalogManager.synchronized}} to {_}read what the order 
is{_}. Whatever the order value, the lock is taken.
 * Arm 2 is a property of {{{}USE{}}}-style state mutation and has nothing to 
do with order at all.

h2. Why this hasn't been caught earlier

The in-source comment on {{SessionCatalog.lookupBuiltinOrTempFunction}} is 
explicit about Arm 1 — it is intentionally _not_ {{synchronized}} to avoid this 
very cycle. That partial mitigation only covers 
{{{}lookupBuiltinOrTempFunction{}}}; {{{}lookupFunctionInfo{}}}, 
{{{}lookupFunctionInfoByIdentifier{}}}, and other {{synchronized}} callers that 
also fall into 
{{{}lookupBuiltinOrTempFunction{}}}/{{{}lookupPersistentFunction{}}} retain the 
hazard.

The concurrent-{{{}SET PATH{}}} smoke test in {{SetPathSuite}} added by 
SPARK-56853 surfaces the contention version of this ({{{}SET PATH{}}} doesn't 
itself call back into {{{}v1SessionCatalog{}}}, so it produces lock contention 
but no true deadlock — the test fails by {_}timeout under load{_}, not by a 
hang). A targeted test with {{USE SCHEMA}} on the setter thread reliably 
reproduces the full hang.
h2. Proposed fix

Two minimal changes:
 # Drop {{synchronized}} on {{SessionCatalog.lookupFunctionInfo}} (and any 
peers that share the hazard). Document that unqualified resolution must not 
hold the catalog's intrinsic lock while consulting the bound 
{{{}CatalogManager{}}}. This kills Arm 1.

 # In 
{{{}CatalogManager.setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}}/{{{}setCurrentCatalogAndNamespace{}}}/{{{}setCurrentCatalogWithoutCheck{}}},
 narrow the {{synchronized}} region so the v1-catalog callbacks run _outside_ 
the manager's intrinsic lock. The state read needed for the decision can be 
snapshotted under the lock, the callback run outside it, and the 
namespace/catalog write done under the lock. This kills Arm 2.

Either change alone breaks the cycle; both make the locking discipline robust 
against future regressions.

For the path-derived kinds, introduce a 
{{CatalogManager.sessionFunctionKindsForUnqualifiedResolution}} helper that 
snapshots {{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the path 
entries together under a single critical section; then {{SessionCatalog}} and 
{{FunctionResolution.isSessionBeforeBuiltinInPath}} consult that one helper 
instead of taking two separate reads, eliminating torn-state observations under 
racing path updates.
h2. Repro

A standalone unit suite using {{SparkSession}} (no DBR-specific catalog 
wrappers needed):
val session = SparkSession.builder().master("local[*]").getOrCreate()
val barrier = new CyclicBarrier(2)
val errors = new ConcurrentLinkedQueue[Throwable]()
 
val lookupThread = new Thread(() => {
try{
barrier.await()
for (_ <-0 until 200) {
session.sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)").head()
}
}catch\{caset: Throwable=> errors.add(t) }
}, "lookup")
 
val useThread = new Thread(() => {
try{
session.sql("CREATE SCHEMA IF NOT EXISTS s1")
session.sql("CREATE SCHEMA IF NOT EXISTS s2")
barrier.await()
for (i <-0 until 200) {
session.sql(if ((i % 2) == 0) "USE SCHEMA s1" else "USE SCHEMA s2")
}
}catch\{caset: Throwable=> errors.add(t) }
}, "use-schema")
 
lookupThread.start(); useThread.start()
lookupThread.join(30000); useThread.join(30000)
require(!lookupThread.isAlive && !useThread.isAlive,
"deadlock detected: SessionCatalog/CatalogManager lock-order inversion")
Under contention this hangs both threads in {{Object.wait}} on each other's 
intrinsic locks; a thread dump shows the cycle directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to