[
https://issues.apache.org/jira/browse/SPARK-56939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-56939:
-----------------------------------
Labels: pull-request-available (was: )
> Resolve deadlock between USE and function lookup
> ------------------------------------------------
>
> Key: SPARK-56939
> URL: https://issues.apache.org/jira/browse/SPARK-56939
> Project: Spark
> Issue Type: Sub-task
> Components: Spark Core
> Affects Versions: 4.2.0
> Reporter: Serge Rielau
> Priority: Major
> Labels: pull-request-available
>
> h1. Summary
> {{{}SessionCatalog{}}}/{{{}CatalogManager{}}} lock-order inversion can
> deadlock concurrent {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} and unqualified
> function resolution
> Issue Type: Bug Component(s): SQL Affects Version/s: 4.2.0 (introduced by
> SPARK-56750) Priority: Major Labels: correctness, concurrency
> ----
> h1. Description
> After SPARK-56750 wired {{CatalogManager}} into {{SessionCatalog}} as the
> live source for the path-driven session function kinds, two
> {{{}SessionCatalog{}}}/{{{}CatalogManager{}}} code paths form a lock-order
> inversion that can deadlock when both run on different threads of the same
> {{{}SparkSession{}}}.
> h2. The cycle
> Arm 1: {{SessionCatalog.synchronized}} → {{CatalogManager.synchronized}}
> {{SessionCatalog.lookupFunctionInfo}} (still {{synchronized}} on the catalog)
> runs unqualified function resolution, which evaluates
> {{{}sessionFunctionKindsInResolutionOrder{}}}. With the live CatalogManager
> bound (production), that derivation needs the catalog manager's lock to read
> {{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the session path:
> private def sessionFunctionKindsInResolutionOrder:
> Seq[SessionCatalog.SessionFunctionKind] =
> sessionFunctionKindsTestOverride.getOrElse {
> catalogManagerForSessionFunctionKinds match {
> caseSome(cm) =>
> CatalogManager.systemFunctionKindsFromPath(
> cm.sqlResolutionPathEntries(cm.currentCatalog.name(),
> cm.currentNamespace.toSeq))
> caseNone=>
> CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
> }
> }
>
> So: lookup acquires the {{SessionCatalog}} intrinsic lock, then needs the
> {{CatalogManager}} intrinsic lock.
> Arm 2: {{CatalogManager.synchronized}} → {{SessionCatalog.synchronized}}
> {{{}CatalogManager.setCurrentNamespace{}}}, {{{}setCurrentCatalog{}}},
> {{{}setCurrentCatalogAndNamespace{}}}, and {{setCurrentCatalogWithoutCheck}}
> all hold {{CatalogManager.synchronized}} and then call back into
> {{{}v1SessionCatalog.setCurrentDatabase*{}}}, which re-acquires
> {{{}SessionCatalog{}}}:
> def setCurrentNamespace(namespace: Array[String]): Unit = synchronized {
> if (isSessionCatalog(currentCatalog) && namespace.length ==1) {
> v1SessionCatalog.setCurrentDatabaseWithNameCheck(
> namespace.head,
> _ => assertNamespaceExist(namespace))
> }else{
> assertNamespaceExist(namespace)
> }
> _currentNamespace = Some(namespace)
> }
> So: {{{}USE SCHEMA{}}}/{{{}USE CATALOG{}}} acquire the {{CatalogManager}}
> lock, then need the {{SessionCatalog}} lock.
> h2. Triggering conditions
> The deadlock requires two threads sharing one {{SparkSession}} such that:
> # One thread is executing any SQL that contains an unqualified function
> reference (extremely common) so it runs {{{}lookupFunctionInfo{}}}.
> # The other thread is executing {{{}USE SCHEMA …{}}}, {{{}USE CATALOG …{}}},
> or {{USE catalog.schema}} so it runs the matching
> {{{}setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}} path.
> Both can also be the _same_ thread interleaved across futures if any rule
> invoked from inside an analyzer scope triggers a {{{}USE{}}}-equivalent.
> h2. Independence from {{spark.sql.functionResolution.sessionOrder}}
> The hazard is independent of the function-resolution order setting
> ({{{}first{}}} / {{second}} / {{{}last{}}}):
> * Arm 1 acquires {{CatalogManager.synchronized}} to {_}read what the order
> is{_}. Whatever the order value, the lock is taken.
> * Arm 2 is a property of {{{}USE{}}}-style state mutation and has nothing to
> do with order at all.
> h2. Why this hasn't been caught earlier
> The in-source comment on {{SessionCatalog.lookupBuiltinOrTempFunction}} is
> explicit about Arm 1 — it is intentionally _not_ {{synchronized}} to avoid
> this very cycle. That partial mitigation only covers
> {{{}lookupBuiltinOrTempFunction{}}}; {{{}lookupFunctionInfo{}}},
> {{{}lookupFunctionInfoByIdentifier{}}}, and other {{synchronized}} callers
> that also fall into
> {{{}lookupBuiltinOrTempFunction{}}}/{{{}lookupPersistentFunction{}}} retain
> the hazard.
> The concurrent-{{{}SET PATH{}}} smoke test in {{SetPathSuite}} added by
> SPARK-56853 surfaces the contention version of this ({{{}SET PATH{}}} doesn't
> itself call back into {{{}v1SessionCatalog{}}}, so it produces lock
> contention but no true deadlock — the test fails by {_}timeout under load{_},
> not by a hang). A targeted test with {{USE SCHEMA}} on the setter thread
> reliably reproduces the full hang.
> h2. Proposed fix
> Two minimal changes:
> # Drop {{synchronized}} on {{SessionCatalog.lookupFunctionInfo}} (and any
> peers that share the hazard). Document that unqualified resolution must not
> hold the catalog's intrinsic lock while consulting the bound
> {{{}CatalogManager{}}}. This kills Arm 1.
> # In
> {{{}CatalogManager.setCurrentNamespace{}}}/{{{}setCurrentCatalog{}}}/{{{}setCurrentCatalogAndNamespace{}}}/{{{}setCurrentCatalogWithoutCheck{}}},
> narrow the {{synchronized}} region so the v1-catalog callbacks run _outside_
> the manager's intrinsic lock. The state read needed for the decision can be
> snapshotted under the lock, the callback run outside it, and the
> namespace/catalog write done under the lock. This kills Arm 2.
> Either change alone breaks the cycle; both make the locking discipline robust
> against future regressions.
> For the path-derived kinds, introduce a
> {{CatalogManager.sessionFunctionKindsForUnqualifiedResolution}} helper that
> snapshots {{{}currentCatalog{}}}, {{{}currentNamespace{}}}, and the path
> entries together under a single critical section; then {{SessionCatalog}} and
> {{FunctionResolution.isSessionBeforeBuiltinInPath}} consult that one helper
> instead of taking two separate reads, eliminating torn-state observations
> under racing path updates.
> h2. Repro
> A standalone unit suite using {{SparkSession}} (no DBR-specific catalog
> wrappers needed):
> val session = SparkSession.builder().master("local[*]").getOrCreate()
> val barrier = new CyclicBarrier(2)
> val errors = new ConcurrentLinkedQueue[Throwable]()
>
> val lookupThread = new Thread(() => {
> try{
> barrier.await()
> for (_ <-0 until 200) {
> session.sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)").head()
> }
> }catch\{caset: Throwable=> errors.add(t) }
> }, "lookup")
>
> val useThread = new Thread(() => {
> try{
> session.sql("CREATE SCHEMA IF NOT EXISTS s1")
> session.sql("CREATE SCHEMA IF NOT EXISTS s2")
> barrier.await()
> for (i <-0 until 200) {
> session.sql(if ((i % 2) == 0) "USE SCHEMA s1" else "USE SCHEMA s2")
> }
> }catch\{caset: Throwable=> errors.add(t) }
> }, "use-schema")
>
> lookupThread.start(); useThread.start()
> lookupThread.join(30000); useThread.join(30000)
> require(!lookupThread.isAlive && !useThread.isAlive,
> "deadlock detected: SessionCatalog/CatalogManager lock-order inversion")
> Under contention this hangs both threads in {{Object.wait}} on each other's
> intrinsic locks; a thread dump shows the cycle directly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]