[ 
https://issues.apache.org/jira/browse/SPARK-56939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-56939:
-----------------------------------
    Labels: pull-request-available  (was: )

> Resolve deadlock between USE and function lookup
> ------------------------------------------------
>
>                 Key: SPARK-56939
>                 URL: https://issues.apache.org/jira/browse/SPARK-56939
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>    Affects Versions: 4.2.0
>            Reporter: Serge Rielau
>            Priority: Major
>              Labels: pull-request-available
>
> h1. Summary
> {{{}SessionCatalog{}}}/{{{}CatalogManager{}}} lock-order inversion can 
> deadlock concurrent {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} and unqualified 
> function resolution
> Issue Type: Bug Component(s): SQL Affects Version/s: 4.2.0 (introduced by 
> SPARK-56750) Priority: Major Labels: correctness, concurrency
> ----
> h1. Description
> After SPARK-56750 wired {{CatalogManager}} into {{SessionCatalog}} as the 
> live source for the path-driven session function kinds, two 
> {{{}SessionCatalog{}}}/{{{}CatalogManager{}}} code paths form a lock-order 
> inversion that can deadlock when both run on different threads of the same 
> {{{}SparkSession{}}}.
> h2. The cycle
> Arm 1: {{SessionCatalog.synchronized}} → {{CatalogManager.synchronized}}
> {{SessionCatalog.lookupFunctionInfo}} (still {{synchronized}} on the catalog) 
> runs unqualified function resolution, which evaluates 
> {{{}sessionFunctionKindsInResolutionOrder{}}}. With the live CatalogManager 
> bound (production), that derivation needs the catalog manager's lock to read 
> {{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the session path:
> private def sessionFunctionKindsInResolutionOrder: 
> Seq[SessionCatalog.SessionFunctionKind] =
> sessionFunctionKindsTestOverride.getOrElse {
> catalogManagerForSessionFunctionKinds match {
> caseSome(cm) =>
> CatalogManager.systemFunctionKindsFromPath(
> cm.sqlResolutionPathEntries(cm.currentCatalog.name(), 
> cm.currentNamespace.toSeq))
> caseNone=>
> CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
> }
> }
>  
> So: lookup acquires the {{SessionCatalog}} intrinsic lock, then needs the 
> {{CatalogManager}} intrinsic lock.
> Arm 2: {{CatalogManager.synchronized}} → {{SessionCatalog.synchronized}}
> {{{}CatalogManager.setCurrentNamespace{}}}, {{{}setCurrentCatalog{}}}, 
> {{{}setCurrentCatalogAndNamespace{}}}, and {{setCurrentCatalogWithoutCheck}} 
> all hold {{CatalogManager.synchronized}} and then call back into 
> {{{}v1SessionCatalog.setCurrentDatabase*{}}}, which re-acquires 
> {{{}SessionCatalog{}}}:
> def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
> if (isSessionCatalog(currentCatalog) && namespace.length ==1) {
> v1SessionCatalog.setCurrentDatabaseWithNameCheck(
> namespace.head,
> _ => assertNamespaceExist(namespace))
> }else{
> assertNamespaceExist(namespace)
> }
> _currentNamespace = Some(namespace)
> }
> So: {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} acquire the {{CatalogManager}} 
> lock, then need the {{SessionCatalog}} lock.
> h2. Triggering conditions
> The deadlock requires two threads sharing one {{SparkSession}} such that:
>  # One thread is executing any SQL that contains an unqualified function 
> reference (extremely common) so it runs {{{}lookupFunctionInfo{}}}.
>  # The other thread is executing {{{}USE SCHEMA …{}}}, {{{}USE CATALOG …{}}}, 
> or {{USE catalog.schema}} so it runs the matching 
> {{{}setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}} path.
> Both can also be the _same_ thread interleaved across futures if any rule 
> invoked from inside an analyzer scope triggers a {{{}USE{}}}-equivalent.
> h2. Independence from {{spark.sql.functionResolution.sessionOrder}}
> The hazard is independent of the function-resolution order setting 
> ({{{}first{}}} / {{second}} / {{{}last{}}}):
>  * Arm 1 acquires {{CatalogManager.synchronized}} to {_}read what the order 
> is{_}. Whatever the order value, the lock is taken.
>  * Arm 2 is a property of {{{}USE{}}}-style state mutation and has nothing to 
> do with order at all.
> h2. Why this hasn't been caught earlier
> The in-source comment on {{SessionCatalog.lookupBuiltinOrTempFunction}} is 
> explicit about Arm 1 — it is intentionally _not_ {{synchronized}} to avoid 
> this very cycle. That partial mitigation only covers 
> {{{}lookupBuiltinOrTempFunction{}}}; {{{}lookupFunctionInfo{}}}, 
> {{{}lookupFunctionInfoByIdentifier{}}}, and other {{synchronized}} callers 
> that also fall into 
> {{{}lookupBuiltinOrTempFunction{}}}/{{{}lookupPersistentFunction{}}} retain 
> the hazard.
> The concurrent-{{{}SET PATH{}}} smoke test in {{SetPathSuite}} added by 
> SPARK-56853 surfaces the contention version of this ({{{}SET PATH{}}} doesn't 
> itself call back into {{{}v1SessionCatalog{}}}, so it produces lock 
> contention but no true deadlock — the test fails by {_}timeout under load{_}, 
> not by a hang). A targeted test with {{USE SCHEMA}} on the setter thread 
> reliably reproduces the full hang.
> h2. Proposed fix
> Two minimal changes:
>  # Drop {{synchronized}} on {{SessionCatalog.lookupFunctionInfo}} (and any 
> peers that share the hazard). Document that unqualified resolution must not 
> hold the catalog's intrinsic lock while consulting the bound 
> {{{}CatalogManager{}}}. This kills Arm 1.
>  # In 
> {{{}CatalogManager.setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}}/{{{}setCurrentCatalogAndNamespace{}}}/{{{}setCurrentCatalogWithoutCheck{}}},
>  narrow the {{synchronized}} region so the v1-catalog callbacks run _outside_ 
> the manager's intrinsic lock. The state read needed for the decision can be 
> snapshotted under the lock, the callback run outside it, and the 
> namespace/catalog write done under the lock. This kills Arm 2.
> Either change alone breaks the cycle; both make the locking discipline robust 
> against future regressions.
> For the path-derived kinds, introduce a 
> {{CatalogManager.sessionFunctionKindsForUnqualifiedResolution}} helper that 
> snapshots {{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the path 
> entries together under a single critical section; then {{SessionCatalog}} and 
> {{FunctionResolution.isSessionBeforeBuiltinInPath}} consult that one helper 
> instead of taking two separate reads, eliminating torn-state observations 
> under racing path updates.
> h2. Repro
> A standalone unit suite using {{SparkSession}} (no DBR-specific catalog 
> wrappers needed):
> val session = SparkSession.builder().master("local[*]").getOrCreate()
> val barrier = new CyclicBarrier(2)
> val errors = new ConcurrentLinkedQueue[Throwable]()
>  
> val lookupThread = new Thread(() => {
> try{
> barrier.await()
> for (_ <-0 until 200) {
> session.sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)").head()
> }
> }catch\{caset: Throwable=> errors.add(t) }
> }, "lookup")
>  
> val useThread = new Thread(() => {
> try{
> session.sql("CREATE SCHEMA IF NOT EXISTS s1")
> session.sql("CREATE SCHEMA IF NOT EXISTS s2")
> barrier.await()
> for (i <-0 until 200) {
> session.sql(if ((i % 2) == 0) "USE SCHEMA s1" else "USE SCHEMA s2")
> }
> }catch\{caset: Throwable=> errors.add(t) }
> }, "use-schema")
>  
> lookupThread.start(); useThread.start()
> lookupThread.join(30000); useThread.join(30000)
> require(!lookupThread.isAlive && !useThread.isAlive,
> "deadlock detected: SessionCatalog/CatalogManager lock-order inversion")
> Under contention this hangs both threads in {{Object.wait}} on each other's 
> intrinsic locks; a thread dump shows the cycle directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to