This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 0d768187867661a983811dc93b6c0f8f9f3cb38d
Author: Serge Rielau <[email protected]>
AuthorDate: Tue May 12 10:48:40 2026 -0700

    [SPARK-56750] default path config
    
    ### What changes were proposed in this pull request?
    
    This PR implements 
**[SPARK-56750](https://issues.apache.org/jira/browse/SPARK-56750)** by adding 
a default SQL PATH (parallel to the default catalog) and wiring path-driven 
semantics through function and namespace resolution when 
`spark.sql.path.enabled` is true.
    
    #### Core feature
    
    - **New config `spark.sql.defaultPath`** (`SQLConf.DEFAULT_PATH`):
      - String session conf (default empty), accepts the full SET PATH grammar.
      - When PATH is enabled and no explicit `SET PATH` was issued, this value 
becomes the effective SQL PATH (mirroring how `currentCatalog` falls back to 
`spark.sql.defaultCatalog`).
      - `SET PATH = DEFAULT_PATH` expands to this configured default.
      - An inner `DEFAULT_PATH` token inside the conf value resolves to the 
Spark built-in default ordering (cycle-safe expansion).
      - The conf is validated for syntax at set time; redundant entries are 
tolerated at lookup time (first-match resolution makes them harmless).
    
    - **Shared SET PATH grammar**: A new `singlePathElementList` rule and 
`parsePathElements` parser entry let `spark.sql.defaultPath` reuse the exact 
`SET PATH` grammar without re-implementation.
    
    - **`CatalogManager` PATH materialization**: A single `sessionPathEntries` 
resolves stored `SET PATH`, parsed `DEFAULT_PATH`, and the seeded 
`defaultPathOrder` fallback into one effective path. `CURRENT_PATH()` and 
unqualified function/relation resolution all read from this single source of 
truth.
    
    #### Path-driven function resolution
    
    - **Fast-path kinds derived from the live PATH**: `SessionCatalog`'s 
unqualified-lookup fast path receives an ordered list of `system.builtin` / 
`system.session` kinds from the effective PATH (in path order) via a 
path-driven provider. Replaces the previous reliance on 
`spark.sql.functionResolution.sessionOrder` (which is now used solely as a seed 
for `defaultPathOrder` and may eventually be retired).
    
    - **The path is the path**: `current_path` is treated like 
`current_catalog` and `current_schema` — we don't distinguish whether it was 
set via `SET PATH`, via the `spark.sql.defaultPath` conf, or by the seeded 
default. Whatever is on the path is searched as written. Placing 
`system.session` after a user catalog is the user's authorization for 
unqualified temp functions to resolve through that position.
    
    - **Security check via path order**: Creating a temp function with a 
builtin's name is blocked when `system.session` is searched before 
`system.builtin`. The "no injection of temp over persistent" property is 
enforced by path order itself: when a user catalog precedes `system.session`, 
persistent resolution wins for unqualified names.
    
    - **`COUNT(*) → COUNT(1)` rewrite gate**: The analyzer rewrite skips when a 
temp `count` would shadow the builtin under the live PATH. Shares the same 
canonical predicate (`FunctionResolution.isSessionBeforeBuiltinInPath`) as the 
security check.
    
    #### Layering
    
    - Resolution-order logic lives at the Strategy layer 
(`FunctionResolution`), not Coordination (`CatalogManager`). `CatalogManager` 
provides path materialization (`sqlResolutionPathEntries`) and pure data 
helpers (`systemFunctionKindsFromPath`) — it doesn't decide resolution order. 
`SessionCatalog`'s default kinds provider derives from `conf.systemPathOrder` 
(the seeded default path) and never branches on the legacy resolution-order 
conf.
    
    #### Deadlock-safe `SessionCatalog` lookups
    
    - `lookupBuiltinOrTempTableFunction` and `lookupFunctionInfo` are 
intentionally not synchronized on `SessionCatalog`. The path-driven kinds 
provider may acquire `CatalogManager.synchronized`; another thread holding that 
lock can call into `SessionCatalog` (e.g. via `setCurrentNamespace`), so 
synchronizing the public lookup methods would introduce lock-order inversion. 
Tighter `synchronized` blocks are kept around the temp-view-context 
post-processing where they're actually needed.
    
    #### Duplicate validation
    
    - `SET PATH` rejects **static** duplicates (literal-vs-literal, repeated 
`current_schema` / `current_database`) at statement time as a typo guard.
    - A literal that happens to match the live `current_schema` is **not** 
flagged: the duplicate is `USE SCHEMA`-state-dependent and harmless under 
first-match resolution.
    - `spark.sql.defaultPath` runs no semantic duplicate check at lookup time — 
the previous lookup-time validate could wedge a session by throwing on every 
unqualified resolution after `USE SCHEMA` collided a literal with 
`current_schema`.
    
    #### `DROP TEMPORARY FUNCTION` correctness
    
    - Unqualified `DROP TEMPORARY FUNCTION <builtin>` without `IF EXISTS` 
raises `FORBIDDEN_OPERATION` against the builtin name as captured.
    - `IF EXISTS` remains a no-op when no temp function exists.
    - Qualified temporary namespaces (`session.foo`, `system.session.foo`) 
always target the temp registry.
    
    #### `PathElement` AST
    
    - New `PathElement` AST in catalyst 
(`org.apache.spark.sql.connector.catalog.PathElement`, `private[sql]`) with 
`expand` and `validateNoStaticDuplicates` helpers shared between 
`SetPathCommand` and `CatalogManager.confDefaultPathEntries`.
    
    #### Performance
    
    - `CatalogManager.confDefaultPathEntries` caches expanded entries by 
`(trimmed conf value, sessionFunctionResolutionOrder)`; on conf-stable sessions 
the lookup hot path is one `AtomicReference.get`. `CurrentSchemaEntry` markers 
are preserved unresolved so the cache stays valid across `USE SCHEMA`.
    
    #### Tests
    
    - `SetPathSuite` (47 tests) covers: lazy fallback, explicit override, `SET 
PATH = DEFAULT_PATH`, cycle break, invalid value, PATH disabled, path-driven 
security, user-only path, duplicate tolerance for `DEFAULT_PATH`, static 
duplicate rejection at SET PATH, the explicit-interleaved path case 
(`system.builtin, <user>, system.session` resolving a temp), and the 
leading-system-kinds else-branch.
    - `FunctionQualificationSuite` and `RelationQualificationSuite` continue to 
pass unchanged (130/130 across the three suites).
    
    ### Why are the changes needed?
    
    Users need an OSS way to define a default SQL PATH before any explicit `SET 
PATH`, mirroring how `spark.sql.defaultCatalog` provides a session default for 
the current catalog. The change also fixes a semantic gap where path-driven 
function-resolution shortcuts (the `COUNT(*)` rewrite gate, the temp-vs-builtin 
security check) ignored `SET PATH` and only consulted the legacy 
`spark.sql.functionResolution.sessionOrder` proxy — so `SET PATH = 
system.session, system.builtin, ...` did not b [...]
    
    ### Does this PR introduce *any* user-facing change?
    
    **Yes.**
    
    - New conf: **`spark.sql.defaultPath`**.
    - With `spark.sql.path.enabled=true`, sessions without explicit `SET PATH` 
resolve using the configured default path.
    - `SET PATH = DEFAULT_PATH` and `CURRENT_PATH()` reflect the configured 
default.
    - Function resolution order for unqualified names follows the effective 
PATH (post-`SET PATH`, with `DEFAULT_PATH` and `defaultPathOrder` fallbacks). 
The temp-vs-builtin security check and the `COUNT(*) → COUNT(1)` rewrite gate 
now reflect `SET PATH` changes.
    - Subtle behavior change in the previously-impossible "temp-only function 
in `sessionFunctionResolutionOrder=last`" case: the unqualified call now 
resolves to the temp instead of `UNRESOLVED_ROUTINE`. This matches the 
documented intent of "`builtin → persistent → session`". The "no temp injection 
over persistent" property is preserved by path order.
    
    ### How was this patch tested?
    
    - `build/sbt "sql/testOnly org.apache.spark.sql.SetPathSuite"`
    - `build/sbt "sql/testOnly *FunctionQualificationSuite 
*RelationQualificationSuite"`
    - `build/sbt "sql/testOnly *DDLSuite -- -z \"drop built-in function\""`
    - `build/sbt "catalyst/Compile/compile" "sql/Compile/compile" 
"sql/Test/compile"`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. Some implementation and tests were iterated with coding-assistant 
tooling; the author reviewed and owns the final patch.
    
    Closes #55717 from srielau/SPARK-56750-default-path.
    
    Authored-by: Serge Rielau <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   4 +
 .../spark/sql/connector/catalog/CatalogPlugin.java |   9 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   7 +-
 .../sql/catalyst/analysis/FunctionResolution.scala |  16 ++
 .../analysis/resolver/FunctionResolver.scala       |   2 +-
 .../analysis/resolver/FunctionResolverUtils.scala  |  19 +-
 .../resolver/HigherOrderFunctionResolver.scala     |   2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      | 100 ++++++--
 .../sql/catalyst/parser/AbstractSqlParser.scala    |  13 +
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  22 +-
 .../sql/connector/catalog/CatalogManager.scala     | 120 ++++++++--
 .../spark/sql/connector/catalog/PathElement.scala  | 150 ++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    |  27 +++
 .../spark/sql/execution/SparkSqlParser.scala       |  10 +-
 .../sql/execution/command/SetPathCommand.scala     |  81 +------
 .../spark/sql/execution/command/functions.scala    |  10 +-
 .../scala/org/apache/spark/sql/SetPathSuite.scala  | 261 ++++++++++++++++++++-
 17 files changed, 707 insertions(+), 146 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 1e3acbc001b3..5761028f6023 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -217,6 +217,10 @@ singleTableSchema
     : colTypeList EOF
     ;
 
+singlePathElementList
+    : pathElement (COMMA pathElement)* EOF
+    ;
+
 singleRoutineParamList
     : colDefinitionList EOF
     ;
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java
index 23f3acc7230f..20586f57bcfd 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogPlugin.java
@@ -28,9 +28,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  * views, and functions.
  * <p>
  * Catalog implementations must implement this marker interface to be loaded by
- * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog 
classes using the
+ * {@link 
org.apache.spark.sql.connector.catalog.Catalogs#load(String,SQLConf)}.
+ * The loader will instantiate catalog classes using the
  * required public no-arg constructor. After creating an instance, it will be 
configured by calling
- * {@link #initialize(String, CaseInsensitiveStringMap)}.
+ * {@link #initialize(String,CaseInsensitiveStringMap)}.
  * <p>
  * Catalog implementations are registered to a name by adding a configuration 
option to Spark:
  * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All 
configuration properties
@@ -56,8 +57,8 @@ public interface CatalogPlugin {
   /**
    * Called to get this catalog's name.
    * <p>
-   * This method is only called after {@link #initialize(String, 
CaseInsensitiveStringMap)} is
-   * called to pass the catalog's name.
+   * This method is only called after
+   * {@link #initialize(String,CaseInsensitiveStringMap)} is called to pass 
the catalog's name.
    */
   String name();
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 739ef41c7791..da2e57c0a649 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1980,14 +1980,15 @@ class Analyzer(
      * This is used for special syntax transformations (e.g., COUNT(*) -> 
COUNT(1)) that
      * should only apply to builtin functions, not to user-defined functions.
      *
-     * In legacy mode (sessionOrder="first"), temp functions shadow builtins, 
so an
-     * unqualified name that matches a temp function should NOT be treated as 
builtin.
+     * When the effective SQL PATH puts `system.session` before 
`system.builtin`, temp
+     * functions shadow builtins, so an unqualified name that matches a temp 
function
+     * should NOT be treated as builtin.
      */
     private def matchesFunctionName(nameParts: Seq[String], expectedName: 
String): Boolean = {
       if (!FunctionResolution.isUnqualifiedOrBuiltinFunctionName(nameParts, 
expectedName)) {
         return false
       }
-      if (nameParts.size == 1 && conf.sessionFunctionResolutionOrder == 
"first") {
+      if (nameParts.size == 1 && 
functionResolution.isSessionBeforeBuiltinInPath) {
         val v1Catalog = catalogManager.v1SessionCatalog
         !v1Catalog.isTemporaryFunction(FunctionIdentifier(nameParts.head))
       } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index 4721425d2cb5..e3dbcc4b6ef7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -76,6 +76,22 @@ class FunctionResolution(
     nameParts.length == 3 &&
       nameParts.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME)
 
+  /**
+   * True iff `system.session` is searched before `system.builtin` in the 
effective SQL PATH.
+   *
+   * Drives the `count(*) -> count(1)` rewrite (which must skip transformation 
when a temp
+   * `count` shadows the builtin) and the `SessionCatalog` security check that 
blocks creating
+   * a temp function with a builtin's name. Reads the live PATH via 
`CatalogManager` and
+   * applies the same kinds extraction that drives `SessionCatalog`'s 
fast-path provider, so
+   * the predicate stays in sync with the lookup loop's actual order.
+   */
+  def isSessionBeforeBuiltinInPath: Boolean = {
+    val path = catalogManager.sqlResolutionPathEntries(
+      catalogManager.currentCatalog.name(), 
catalogManager.currentNamespace.toSeq)
+    CatalogManager.systemFunctionKindsFromPath(path).headOption
+      .contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
+  }
+
   /**
    * Produces the ordered list of candidate names for resolution. Expansion 
happens in two cases:
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
index 1a8658bb764d..dd70963a7984 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
@@ -60,7 +60,7 @@ import org.apache.spark.sql.catalyst.util.CollationFactory
  */
 class FunctionResolver(
     expressionResolver: ExpressionResolver,
-    functionResolution: FunctionResolution,
+    protected val functionResolution: FunctionResolution,
     aggregateExpressionResolver: AggregateExpressionResolver,
     binaryArithmeticResolver: BinaryArithmeticResolver)
     extends TreeNodeResolver[UnresolvedFunction, Expression]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
index 503c94fc9cdf..3c5a3f1832e8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis.resolver
 
 import java.util.Locale
 
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.{
+  FunctionResolution,
   ResolvedStar,
   Star,
   UnresolvedFunction,
@@ -35,6 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
  */
 trait FunctionResolverUtils {
   protected def expressionResolver: ExpressionResolver
+  protected def functionResolution: FunctionResolution
   protected def conf: SQLConf
 
   private val scopes = expressionResolver.getNameScopes
@@ -99,7 +102,21 @@ trait FunctionResolverUtils {
       unresolvedFunction: UnresolvedFunction,
       normalizeFunctionName: Boolean = true
   ): Boolean = {
-    !unresolvedFunction.isDistinct && isCount(unresolvedFunction, 
normalizeFunctionName)
+    !unresolvedFunction.isDistinct &&
+      isCount(unresolvedFunction, normalizeFunctionName) &&
+      !isUnqualifiedCountShadowedByTemp(unresolvedFunction)
+  }
+
+  /**
+   * Keep single-pass behavior aligned with fixed-point: when PATH puts 
system.session before
+   * system.builtin and a temp `count` exists, unqualified `count(*)` must not 
be rewritten to
+   * `count(1)`.
+   */
+  private def isUnqualifiedCountShadowedByTemp(unresolvedFunction: 
UnresolvedFunction): Boolean = {
+    unresolvedFunction.nameParts.length == 1 &&
+      functionResolution.isSessionBeforeBuiltinInPath &&
+      functionResolution.catalogManager.v1SessionCatalog
+        
.isTemporaryFunction(FunctionIdentifier(unresolvedFunction.nameParts.head))
   }
 
   private def isCount(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
index 6b90a5c05baf..676ef381f2f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
  */
 class HigherOrderFunctionResolver(
     protected val expressionResolver: ExpressionResolver,
-    functionResolution: FunctionResolution)
+    protected val functionResolution: FunctionResolution)
     extends TreeNodeResolver[UnresolvedFunction, Expression]
     with ProducesUnresolvedSubtree
     with CoercesExpressionTypes
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 9c863a7b55fe..71653eec139b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -113,18 +113,67 @@ class SessionCatalog(
     identifier.copy(funcName = "") == SESSION_NAMESPACE_TEMPLATE
 
   /**
-   * Session function kinds in resolution order for unqualified lookups.
-   * Matches [[SQLConf.sessionFunctionResolutionOrder]]: "first" (session 
first),
-   * "second" (default), "last" (builtin only; session tried after persistent).
+   * When set, unqualified builtin/temp function resolution uses this fixed 
kind order instead of
+   * [[catalogManagerForSessionFunctionKinds]] / [[SQLConf.systemPathOrder]]. 
For unit tests only;
+   * production relies on the catalog manager binding.
    */
-  private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] 
= {
-    conf.sessionFunctionResolutionOrder match {
-      case "first" => Seq(Temp, Builtin)
-      case "last" => Seq(Builtin)
-      case _ => Seq(Builtin, Temp) // "second" (default)
-    }
+  @volatile private var sessionFunctionKindsTestOverride: 
Option[Seq[SessionFunctionKind]] = None
+
+  /**
+   * Live PATH for session function kinds. Set from
+   * [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor 
via
+   * [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and 
the security check
+   * that blocks temp functions from shadowing builtins read the effective SQL 
PATH (post-`SET
+   * PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]] 
fallbacks already
+   * applied).
+   *
+   * When unset (e.g. standalone [[SessionCatalog]] in tests), kinds derive 
from
+   * [[SQLConf.systemPathOrder]] -- the seeded default path -- without 
assuming other legacy
+   * resolution-order conf beyond seeding `defaultPathOrder`.
+   */
+  @volatile private var catalogManagerForSessionFunctionKinds: 
Option[CatalogManager] = None
+
+  /**
+   * Wire live PATH-derived session function kinds from the session 
[[CatalogManager]].
+   * Called once from 
[[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor.
+   */
+  private[sql] def bindCatalogManagerForSessionFunctionKinds(cm: 
CatalogManager): Unit = {
+    catalogManagerForSessionFunctionKinds = Some(cm)
+  }
+
+  /**
+   * Pin session function kinds for tests (`None` clears). Uses `private[sql]` 
so tests under the
+   * `org.apache.spark.sql` package can control ordering without a public 
catalog API.
+   */
+  private[sql] def setSessionFunctionKindsTestOverride(
+      kinds: Option[Seq[SessionFunctionKind]]): Unit = {
+    sessionFunctionKindsTestOverride = kinds
   }
 
+  /**
+   * Session function kinds in resolution order for unqualified lookups: test 
override if set,
+   * else live PATH from [[catalogManagerForSessionFunctionKinds]], else
+   * [[SQLConf.systemPathOrder]].
+   */
+  private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] =
+    sessionFunctionKindsTestOverride.getOrElse {
+      catalogManagerForSessionFunctionKinds match {
+        case Some(cm) =>
+          CatalogManager.systemFunctionKindsFromPath(
+            cm.sqlResolutionPathEntries(cm.currentCatalog.name(), 
cm.currentNamespace.toSeq))
+        case None =>
+          CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
+      }
+    }
+
+  /**
+   * True iff the effective SQL PATH searches `system.session` before 
`system.builtin`. Used
+   * to gate the security check that blocks temporary functions from silently 
shadowing a
+   * builtin of the same name.
+   */
+  private def sessionFirstInPath: Boolean =
+    sessionFunctionKindsInResolutionOrder.headOption.contains(Temp)
+
   /**
    * Checks if a namespace represents temporary functions.
    */
@@ -2081,12 +2130,11 @@ class SessionCatalog(
       qualifyIdentifier(func)
     }
 
-    // Security check: When legacy mode is enabled, block SQL-created 
temporary functions
-    // from shadowing builtin functions (to preserve master behavior)
-    // Scala UDFs are still allowed to shadow in legacy mode
-    // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function 
already exists
-    val sessionFirst = conf.sessionFunctionResolutionOrder == "first"
-    if (func.database.isEmpty && sessionFirst && !overrideIfExists) {
+    // Security check: when the effective SQL PATH searches `system.session` 
before
+    // `system.builtin`, block creating an unqualified temporary function 
whose name
+    // collides with a builtin so it cannot silently shadow that builtin via 
unqualified
+    // resolution. We throw ROUTINE_ALREADY_EXISTS to indicate the conflict.
+    if (func.database.isEmpty && sessionFirstInPath && !overrideIfExists) {
       val funcName = func.funcName
       // Check if function exists in builtin namespace (extensions are stored 
as builtins)
       val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName)
@@ -2206,10 +2254,11 @@ class SessionCatalog(
       // Use FunctionIdentifier with session namespace for temporary functions
       val tempIdentifier = tempFunctionIdentifier(function.name.funcName)
 
-      // Security check: When legacy mode is enabled, block SQL-created 
temporary functions
-      // from shadowing builtin functions (including extensions) as a safeguard
-      // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function 
already exists
-      if ((conf.sessionFunctionResolutionOrder == "first") && 
!overrideIfExists) {
+      // Security check: when the effective SQL PATH searches `system.session` 
before
+      // `system.builtin`, block creating an unqualified temporary function 
whose name
+      // collides with a builtin (including extensions) so it cannot silently 
shadow that
+      // builtin via unqualified resolution.
+      if (sessionFirstInPath && !overrideIfExists) {
         val funcName = function.name.funcName
         // Check if function exists in builtin namespace (extensions are 
stored as builtins)
         val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName)
@@ -2515,7 +2564,12 @@ class SessionCatalog(
    * Look up the `ExpressionInfo` of the given function by name.
    * Resolution order follows the configured path (e.g. builtin then session).
    */
-  def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = 
synchronized {
+  def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = 
{
+    // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution 
order may call
+    // into [[CatalogManager]] (e.g. 
[[CatalogManager.sqlResolutionPathEntries]]), which can
+    // synchronize on the manager; another
+    // thread can hold that lock and call into this catalog (e.g. via 
`setCurrentNamespace`),
+    // which would deadlock if this method also synchronized on `this`.
     lookupFunctionWithShadowing(name, tableFunctionRegistry, 
checkBuiltinOperators = false)
   }
 
@@ -2667,7 +2721,11 @@ class SessionCatalog(
   /**
    * Look up the [[ExpressionInfo]] associated with the specified function, 
assuming it exists.
    */
-  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = 
synchronized {
+  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
+    // Intentionally not `synchronized` on this [[SessionCatalog]] (see
+    // [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp 
resolution uses
+    // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must 
not run under
+    // this catalog's intrinsic lock.
     if (name.database.isEmpty) {
       lookupBuiltinOrTempFunction(name.funcName)
         .orElse(lookupBuiltinOrTempTableFunction(name.funcName))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
index 216136d8a7c8..29bf924f244e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
 import org.apache.spark.sql.catalyst.plans.logical.{CompoundPlanStatement, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.connector.catalog.PathElement
 import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -110,6 +111,18 @@ abstract class AbstractSqlParser extends AbstractParser 
with ParserInterface {
     }
   }
 
+  /**
+   * Parse the right-hand side of `SET PATH = ...` (a comma-separated list of 
path elements).
+   * Used by [[org.apache.spark.sql.connector.catalog.CatalogManager]] to 
honor the
+   * [[SQLConf.DEFAULT_PATH]] conf without re-implementing the SET PATH 
grammar.
+   */
+  private[sql] def parsePathElements(sqlText: String): Seq[PathElement] = 
parse(sqlText) { parser =>
+    val ctx = parser.singlePathElementList()
+    withErrorHandling(ctx, Some(sqlText)) {
+      astBuilder.visitSinglePathElementList(ctx)
+    }
+  }
+
   def withErrorHandling[T](ctx: ParserRuleContext, sqlText: 
Option[String])(toResult: => T): T = {
     withOrigin(ctx, sqlText) {
       try {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index a9bbd34a72d1..cdb01c36d744 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -47,7 +47,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, 
DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, 
stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, 
SupportsNamespaces, TableCatalog, TableWritePrivilege}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, 
PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
 import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, 
UnboundedRange, VersionRange}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
@@ -708,6 +708,26 @@ class AstBuilder extends DataTypeAstBuilder
     visitMultipartIdentifier(ctx.multipartIdentifier)
   }
 
+  override def visitSinglePathElementList(
+      ctx: SinglePathElementListContext): Seq[PathElement] = withOrigin(ctx) {
+    ctx.pathElement().asScala.map(visitPathElement).toSeq
+  }
+
+  override def visitPathElement(ctx: PathElementContext): PathElement = 
withOrigin(ctx) {
+    if (ctx.DEFAULT_PATH() != null) PathElement.DefaultPath
+    else if (ctx.SYSTEM_PATH() != null) PathElement.SystemPath
+    else if (ctx.PATH() != null) PathElement.PathRef
+    else if (ctx.CURRENT_DATABASE() != null || ctx.CURRENT_SCHEMA() != null) {
+      PathElement.CurrentSchema
+    } else {
+      val parts = visitMultipartIdentifier(ctx.multipartIdentifier())
+      if (parts.length < 2) {
+        throw 
QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString("."))
+      }
+      PathElement.SchemaInPath(parts)
+    }
+  }
+
   override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = 
withOrigin(ctx) {
     typedVisit[DataType](ctx.dataType)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index f73b0e68cecc..9a39e4ebdd27 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.connector.catalog
 
+import java.util.concurrent.atomic.AtomicReference
+
 import scala.collection.mutable
 import scala.util.Try
 
@@ -24,6 +26,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
TempVariableManager}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.connector.catalog.transactions.Transaction
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -52,6 +55,12 @@ class CatalogManager(
   // TODO: create a real SYSTEM catalog to host `TempVariableManager` under 
the SESSION namespace.
   val tempVariableManager: TempVariableManager = new TempVariableManager
 
+  // Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds 
list itself is
+  // pure data conversion (system entries from the path, in path order); the 
*decision* to use
+  // path-order kinds for unqualified lookups lives at the Strategy layer (see 
callers of
+  // [[CatalogManager.systemFunctionKindsFromPath]]).
+  v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)
+
   def catalog(name: String): CatalogPlugin = synchronized {
     if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
       v2SessionCatalog
@@ -138,8 +147,62 @@ class CatalogManager(
 
   private var _sessionPath: Option[Seq[SessionPathEntry]] = None
 
-  /** Returns the raw stored session path entries, or None if no path is set. 
*/
-  def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { 
_sessionPath }
+  /**
+   * Cache for [[confDefaultPathEntries]]: stores the expanded 
[[SessionPathEntry]] list keyed
+   * on the trimmed [[SQLConf#DEFAULT_PATH]] string and
+   * [[SQLConf#SESSION_FUNCTION_RESOLUTION_ORDER]] value (the only conf that 
affects the
+   * expansion of `DEFAULT_PATH` / `SYSTEM_PATH` tokens).
+   * `CurrentSchemaEntry` markers are preserved unresolved so the cache stays 
valid across
+   * `USE SCHEMA`.
+   */
+  private val confDefaultPathCache =
+    new AtomicReference[Option[(String, String, Seq[SessionPathEntry])]](None)
+
+  /**
+   * Returns the effective session path entries: the explicit `SET PATH` value 
if stored,
+   * else the parsed [[SQLConf#DEFAULT_PATH]] conf if non-empty (mirroring how
+   * [[currentCatalog]] falls back to [[SQLConf#DEFAULT_CATALOG]]). Returns 
`None` when
+   * [[SQLConf#PATH_ENABLED]] is false or both sources are empty.
+   */
+  def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
+    if (!conf.pathEnabled) None
+    else _sessionPath.orElse(confDefaultPathEntries)
+  }
+
+  /** Raw `_sessionPath` (post-`SET PATH`), without the 
[[SQLConf#DEFAULT_PATH]] fallback. */
+  def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { 
_sessionPath }
+
+  /**
+   * Parsed and expanded [[SQLConf#DEFAULT_PATH]] value, or `None` when the 
conf is empty.
+   * Reuses the SET PATH grammar via
+   * 
[[org.apache.spark.sql.catalyst.parser.AbstractSqlParser#parsePathElements]] 
(via
+   * [[org.apache.spark.sql.catalyst.parser.CatalystSqlParser]]). An inner
+   * `DEFAULT_PATH` token resolves to the spark-builtin default ordering 
(cycle break).
+   *
+   * Unlike `SET PATH`, this does NOT run a duplicate check: lookup uses 
first-match
+   * resolution, so any redundant entry (including ones that only collide 
after a later
+   * `USE SCHEMA`) is dead code rather than an error. Cached so the hot path 
is a single
+   * atomic load on conf-stable sessions.
+   */
+  def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
+    val confValue = conf.defaultPath
+    if (confValue == null || confValue.trim.isEmpty) {
+      confDefaultPathCache.set(None)
+      None
+    } else {
+      val trimmed = confValue.trim
+      val sessionOrder = conf.sessionFunctionResolutionOrder
+      val expanded = confDefaultPathCache.get() match {
+        case Some((k, ord, cached)) if k == trimmed && ord == sessionOrder => 
cached
+        case _ =>
+          val elements = CatalystSqlParser.parsePathElements(trimmed)
+          val computed = PathElement.expand(elements, conf, this, 
isConfDefaultExpansion = true)
+          confDefaultPathCache.set(Some((trimmed, sessionOrder, computed)))
+          computed
+      }
+      if (expanded.isEmpty) None else Some(expanded)
+    }
+  }
 
   def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
     _sessionPath = Some(entries)
@@ -150,18 +213,18 @@ class CatalogManager(
   }
 
   private[sql] def copySessionPathFrom(other: CatalogManager): Unit = 
synchronized {
-    _sessionPath = other.sessionPathEntries
+    _sessionPath = other.storedSessionPathEntries
   }
 
   /**
    * String form of the current resolution path for CURRENT_PATH().
-   * When PATH is enabled and a session path is stored, formats the effective 
path entries
-   * with markers expanded. Otherwise falls back to the legacy 
resolutionSearchPath.
+   * When PATH is enabled and a session path is in effect (stored or via
+   * [[SQLConf#DEFAULT_PATH]]), formats the resolved entries. Otherwise falls 
back to the legacy
+   * resolutionSearchPath.
    */
   def currentPathString: String = synchronized {
     import CatalogV2Implicits._
-    val stored = if (conf.pathEnabled) _sessionPath else None
-    stored match {
+    sessionPathEntries match {
       case Some(entries) =>
         val resolved = CatalogManager.resolvePathEntries(
           entries, currentCatalog.name(), currentNamespace.toSeq)
@@ -174,8 +237,9 @@ class CatalogManager(
 
   /**
    * Ordered catalog/schema path entries for resolving unqualified SQL object 
names.
-   * When PATH is off or unset, applies [[SQLConf.defaultPathOrder]] (legacy).
-   * When PATH is explicitly set, uses the resolved stored path entries.
+   * When PATH is off or unset, applies [[SQLConf#defaultPathOrder]] (legacy).
+   * When PATH is in effect (stored or via the [[SQLConf#DEFAULT_PATH]] conf), 
uses the
+   * resolved entries.
    */
   def sqlResolutionPathEntries(
       pathDefaultCatalog: String,
@@ -185,8 +249,7 @@ class CatalogManager(
     val defaultEntry =
       if (pathDefaultNamespace.isEmpty) Seq(pathDefaultCatalog)
       else pathDefaultCatalog +: pathDefaultNamespace
-    val stored = if (conf.pathEnabled) _sessionPath else None
-    stored match {
+    sessionPathEntries match {
       case Some(entries) =>
         CatalogManager.resolvePathEntries(entries, expandCatalog, 
expandNamespace)
       case None =>
@@ -204,16 +267,17 @@ class CatalogManager(
 
   /**
    * True if `system.session` is on the SQL path. Only literal path entries 
can match: the
-   * [[CurrentSchemaEntry]] marker expands to `currentCatalog.name() +: 
currentNamespace`, and
+   * 
[[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] 
marker expands to
+   * `currentCatalog.name() +: currentNamespace`, and
    * `system` is not a registered catalog (it is a synthetic namespace served 
via
    * [[org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog]] / 
`lookupBuiltinOrTempFunction`,
    * not loadable via [[catalog]]), so `currentCatalog.name()` cannot be 
`"system"`. If that
    * invariant ever changes, this short-circuit must be revisited.
-   * Inspecting stored entries directly avoids loading the configured default 
catalog.
+   * Inspecting effective entries directly avoids loading the configured 
default catalog.
    */
   def isSystemSessionOnPath: Boolean = synchronized {
     if (!conf.pathEnabled) return true
-    _sessionPath match {
+    sessionPathEntries match {
       case None => true
       case Some(entries) => entries.exists {
         case CatalogManager.LiteralPathEntry(parts) =>
@@ -289,6 +353,7 @@ class CatalogManager(
     _currentNamespace = None
     _currentCatalogName = None
     _sessionPath = None
+    confDefaultPathCache.set(None)
     v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
   }
 }
@@ -333,16 +398,37 @@ private[sql] object CatalogManager extends Logging {
   /**
    * True if the multipart name uses the session temp view namespace: two-part 
`session.view`
    * or three-part `system.session.view`. The two-part form can also denote a 
persistent relation
-   * in schema `session`; resolution order is controlled by 
[[SQLConf.prioritizeSystemCatalog]].
+   * in schema `session`; resolution order is controlled by 
[[SQLConf#prioritizeSystemCatalog]].
    */
   def isSessionQualifiedViewName(nameParts: Seq[String]): Boolean = {
     (nameParts.length == 2 && 
nameParts.head.equalsIgnoreCase(SESSION_NAMESPACE)) ||
       isFullyQualifiedSystemSessionViewName(nameParts)
   }
 
-  /** True if a SQL path entry is the well-known `system.session` entry. */
+  /** True if a SQL path entry is the well-known `system.session` entry 
(case-insensitive). */
   def isSystemSessionPathEntry(parts: Seq[String]): Boolean =
-    parts == Seq(SYSTEM_CATALOG_NAME, SESSION_NAMESPACE)
+    parts.length == 2 &&
+      parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
+      parts(1).equalsIgnoreCase(SESSION_NAMESPACE)
+
+  /** True if a SQL path entry is the well-known `system.builtin` entry 
(case-insensitive). */
+  def isSystemBuiltinPathEntry(parts: Seq[String]): Boolean =
+    parts.length == 2 &&
+      parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
+      parts(1).equalsIgnoreCase(BUILTIN_NAMESPACE)
+
+  /**
+   * Extract `system.builtin` / `system.session` entries from a resolved PATH, 
mapped to
+   * [[SessionCatalog.SessionFunctionKind]] in path order. Pure data 
conversion -- callers
+   * decide whether and how to use this list.
+   */
+  def systemFunctionKindsFromPath(
+      path: Seq[Seq[String]]): Seq[SessionCatalog.SessionFunctionKind] =
+    path.flatMap { e =>
+      if (isSystemBuiltinPathEntry(e)) Some(SessionCatalog.Builtin)
+      else if (isSystemSessionPathEntry(e)) Some(SessionCatalog.Temp)
+      else None
+    }
 
   /**
    * A single entry in the session SQL path: either a literal schema
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
new file mode 100644
index 000000000000..ee9959762da9
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import java.util.Locale
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.CatalogManager.{
+  CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry
+}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * One element on the right-hand side of `SET PATH = ...`: either a well-known 
shortcut
+ * keyword (DEFAULT_PATH, SYSTEM_PATH, PATH, CURRENT_SCHEMA / 
CURRENT_DATABASE) or a
+ * fully qualified schema reference (`catalog.namespace...` with at least 2 
parts).
+ *
+ * The same grammar is reused to parse the
+ * [[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] conf value, so this
+ * AST node lives in catalyst beside [[CatalogManager]] rather than in the 
runtime
+ * [[org.apache.spark.sql.execution.command.SetPathCommand]].
+ */
+private[sql] sealed trait PathElement
+
+private[sql] object PathElement {
+  case object DefaultPath extends PathElement
+  case object SystemPath extends PathElement
+  case object PathRef extends PathElement
+
+  /**
+   * Current database/schema (SQL aliases). Stored as the
+   * 
[[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]]
+   * marker so resolution candidates expand against the live `USE SCHEMA`.
+   */
+  case object CurrentSchema extends PathElement
+
+  /** Fully qualified schema reference (`catalog.namespace...`). Must have at 
least 2 parts. */
+  case class SchemaInPath(parts: Seq[String]) extends PathElement
+
+  /**
+   * Expand a parsed [[PathElement]] list into concrete [[SessionPathEntry]] 
entries
+   * suitable for storing in [[CatalogManager._sessionPath]] or returning from
+   * [[CatalogManager#sessionPathEntries]].
+   *
+   * @param isConfDefaultExpansion when true, an inner [[DefaultPath]] token 
resolves
+   *                               to the spark-builtin default ordering 
(cycle break)
+   *                               rather than reading
+   *                               
[[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] again.
+   *                               Set to true when this method is invoked 
while
+   *                               parsing 
[[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]]
+   *                               itself.
+   */
+  def expand(
+      elements: Seq[PathElement],
+      conf: SQLConf,
+      catalogManager: CatalogManager,
+      isConfDefaultExpansion: Boolean = false): Seq[SessionPathEntry] = {
+    val currentSchemaSentinel = Seq("__current_schema__")
+
+    def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map {
+      case p if p == currentSchemaSentinel => CurrentSchemaEntry
+      case p => LiteralPathEntry(p)
+    }
+
+    def builtinDefaultWithCurrentSchema: Seq[SessionPathEntry] =
+      toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel)))
+
+    def defaultPathExpansion: Seq[SessionPathEntry] = {
+      if (isConfDefaultExpansion) {
+        // Cycle break: inner DEFAULT_PATH inside the conf default value falls 
back to the
+        // spark-builtin default ordering instead of recursing.
+        builtinDefaultWithCurrentSchema
+      } else {
+        
catalogManager.confDefaultPathEntries.getOrElse(builtinDefaultWithCurrentSchema)
+      }
+    }
+
+    elements.flatMap {
+      case DefaultPath =>
+        defaultPathExpansion
+      case SystemPath =>
+        toEntries(conf.systemPathOrder)
+      case CurrentSchema =>
+        Seq(CurrentSchemaEntry)
+      case PathRef =>
+        catalogManager.storedSessionPathEntries.getOrElse(defaultPathExpansion)
+      case SchemaInPath(parts) =>
+        Seq(LiteralPathEntry(parts))
+    }
+  }
+
+  /**
+   * Reject *static* duplicates in a SET PATH entry list: identical
+   * [[CatalogManager#LiteralPathEntry]] parts and repeated
+   * 
[[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]] 
markers
+   * (the `current_schema` / `current_database`
+   * cross-alias case). Used for the interactive `SET PATH` form to surface 
user typos at
+   * statement time.
+   *
+   * Deliberately does NOT compare a [[CatalogManager#LiteralPathEntry]] 
against a
+   * 
[[org.apache.spark.sql.connector.catalog.CatalogManager.CurrentSchemaEntry$]]: 
such a
+   * "duplicate" depends on the live `USE SCHEMA`
+   * and is harmless at lookup (first-match resolution skips the dead literal).
+   * [[org.apache.spark.sql.internal.SQLConf#DEFAULT_PATH()]] expansion skips 
this check
+   * entirely so transient `USE`-induced
+   * collisions don't wedge unqualified resolution.
+   */
+  def validateNoStaticDuplicates(
+      entries: Seq[SessionPathEntry],
+      caseSensitive: Boolean): Seq[SessionPathEntry] = {
+    val seenLiterals = new mutable.HashSet[Seq[String]]
+    var seenCurrentSchema = false
+    entries.foreach {
+      case CurrentSchemaEntry =>
+        if (seenCurrentSchema) {
+          throw new AnalysisException(
+            errorClass = "DUPLICATE_SQL_PATH_ENTRY",
+            messageParameters = Map("pathEntry" -> "current_schema"))
+        }
+        seenCurrentSchema = true
+      case LiteralPathEntry(parts) =>
+        val key = if (caseSensitive) parts else 
parts.map(_.toLowerCase(Locale.ROOT))
+        if (!seenLiterals.add(key)) {
+          throw new AnalysisException(
+            errorClass = "DUPLICATE_SQL_PATH_ENTRY",
+            messageParameters = Map(
+              "pathEntry" ->
+                parts.map(p => if (p.contains(".")) s"`$p`" else 
p).mkString(".")))
+        }
+    }
+    entries
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 59958eb8da5c..290de17fb239 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -43,9 +43,11 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
+import org.apache.spark.sql.connector.catalog.PathElement.PathRef
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
 import org.apache.spark.storage.{StorageLevel, StorageLevelMapper}
@@ -2447,6 +2449,29 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val DEFAULT_PATH =
+    buildConf("spark.sql.defaultPath")
+      .version("4.2.0")
+      .doc("Default SQL PATH used when no SET PATH has been issued in the 
session; this is " +
+        "also the value to which `SET PATH = DEFAULT_PATH` expands. Accepts 
the full SET PATH " +
+        "grammar; an inner DEFAULT_PATH token resolves to the spark-builtin 
default ordering. " +
+        "The PATH keyword is not allowed in this conf value. " +
+        "When empty, the spark-builtin default ordering controlled by " +
+        "`spark.sql.functionResolution.sessionOrder` applies. Validated for 
syntax at set time; " +
+        "redundant entries are tolerated (lookup uses first-match resolution). 
The interactive " +
+        "SET PATH form still rejects static duplicates as a typo guard.")
+      .withBindingPolicy(ConfigBindingPolicy.SESSION)
+      .stringConf
+      .checkValue(
+        v =>
+          v == null || v.trim.isEmpty ||
+            Try(CatalystSqlParser.parsePathElements(v.trim))
+              .toOption
+              .exists(!_.contains(PathRef)),
+        "The value must be empty or a comma-separated SET PATH element list " +
+          "(same grammar as SET PATH, except PATH is not allowed).")
+      .createWithDefault("")
+
   // Whether to retain group by columns or not in GroupedData.agg.
   val DATAFRAME_RETAIN_GROUP_COLUMNS = 
buildConf("spark.sql.retainGroupColumns")
     .version("1.4.0")
@@ -8538,6 +8563,8 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED)
 
+  def defaultPath: String = getConf(SQLConf.DEFAULT_PATH)
+
   /**
    * Returns the resolution search path for error messages and resolution 
order.
    * This is the single source of truth for the search path used for 
functions, tables, and views.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 6b51a1edfa35..0ecbf7609d2b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -360,15 +360,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitSetPath(ctx: SetPathContext): LogicalPlan = 
withOrigin(ctx) {
-    val elements = ctx.pathElement().asScala.map { pe =>
-      if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath
-      else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath
-      else if (pe.PATH() != null) PathElement.PathRef
-      else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase
-      else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema
-      else 
PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier()))
-    }.toSeq
-    SetPathCommand(elements)
+    SetPathCommand(ctx.pathElement().asScala.map(visitPathElement).toSeq)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
index 70538160eefd..82ab46ec9b14 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
@@ -17,40 +17,17 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.util.Locale
-
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.connector.catalog.CatalogManager.{
-  CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry
-}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.connector.catalog.PathElement
 import org.apache.spark.sql.internal.SQLConf
 
-/**
- * Path element for SET PATH: either a well-known shortcut or a fully 
qualified schema reference.
- * SchemaInPath requires at least 2 parts (catalog.namespace); multi-level 
namespaces are allowed.
- */
-sealed trait PathElement
-
-object PathElement {
-  case object DefaultPath extends PathElement
-  case object SystemPath extends PathElement
-  case object PathRef extends PathElement
-  /**
-   * Current database/schema (SQL aliases). Stored as system.current_schema; 
expands when
-   * building resolution candidates so later USE SCHEMA is reflected.
-   */
-  case object CurrentDatabase extends PathElement
-  case object CurrentSchema extends PathElement
-  /** Fully qualified schema reference (catalog.namespace...). Must have at 
least 2 parts. */
-  case class SchemaInPath(parts: Seq[String]) extends PathElement
-}
-
 /**
  * Command for SET PATH = pathElement (, pathElement)*
  * Expands shortcuts at run time, validates no duplicates, and sets the 
internal session path.
+ *
+ * The [[PathElement]] AST and its expansion live in catalyst so that the same 
grammar can be
+ * reused to parse the [[SQLConf.DEFAULT_PATH]] conf value.
  */
 case class SetPathCommand(elements: Seq[PathElement]) extends 
LeafRunnableCommand {
 
@@ -64,23 +41,9 @@ case class SetPathCommand(elements: Seq[PathElement]) 
extends LeafRunnableComman
     }
     val conf = sparkSession.sessionState.conf
     val catalogManager = sparkSession.sessionState.catalogManager
-    val currentCatalog = catalogManager.currentCatalog.name
-    val currentNamespace = catalogManager.currentNamespace.toSeq
-    val caseSensitive = conf.caseSensitiveAnalysis
 
-    val expanded = expandPathElements(elements, conf, catalogManager)
-    val seen = new scala.collection.mutable.HashSet[Seq[String]]
-    expanded.foreach { entry =>
-      val concrete = entry.resolve(currentCatalog, currentNamespace)
-      def normalize(s: String): String = if (caseSensitive) s else 
s.toLowerCase(Locale.ROOT)
-      val key = concrete.map(normalize)
-      if (!seen.add(key)) {
-        throw new AnalysisException(
-          errorClass = "DUPLICATE_SQL_PATH_ENTRY",
-          messageParameters = Map("pathEntry" ->
-            concrete.map(p => if (p.contains(".")) s"`$p`" else 
p).mkString(".")))
-      }
-    }
+    val expanded0 = PathElement.expand(elements, conf, catalogManager)
+    val expanded = PathElement.validateNoStaticDuplicates(expanded0, 
conf.caseSensitiveAnalysis)
 
     if (expanded.isEmpty) {
       catalogManager.clearSessionPath()
@@ -89,36 +52,4 @@ case class SetPathCommand(elements: Seq[PathElement]) 
extends LeafRunnableComman
     }
     Seq.empty
   }
-
-  private def expandPathElements(
-      elements: Seq[PathElement],
-      conf: SQLConf,
-      catalogManager: CatalogManager): Seq[SessionPathEntry] = {
-    val currentSchemaSentinel = Seq("__current_schema__")
-
-    def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map {
-      case p if p == currentSchemaSentinel => CurrentSchemaEntry
-      case p => LiteralPathEntry(p)
-    }
-
-    def defaultWithCurrentSchema: Seq[SessionPathEntry] =
-      toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel)))
-
-    elements.flatMap {
-      case PathElement.DefaultPath =>
-        defaultWithCurrentSchema
-      case PathElement.SystemPath =>
-        toEntries(conf.systemPathOrder)
-      case PathElement.CurrentDatabase | PathElement.CurrentSchema =>
-        Seq(CurrentSchemaEntry)
-      case PathElement.PathRef =>
-        catalogManager.sessionPathEntries.getOrElse(defaultWithCurrentSchema)
-      case PathElement.SchemaInPath(parts) =>
-        if (parts.length < 2) {
-          throw 
QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString("."))
-        }
-        Seq(LiteralPathEntry(parts))
-    }
-  }
-
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 9839a3edbbab..79db97744496 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -254,9 +254,13 @@ case class DropFunctionCommand(
         identifier.funcName
       }
 
-      // Check if temp function exists first - if it does, allow dropping it 
even if a builtin
-      // with the same name exists (shadowing case)
-      if (!catalog.isTemporaryFunction(FunctionIdentifier(funcName)) &&
+      // Keep DROP TEMPORARY FUNCTION semantics consistent for unqualified 
names:
+      // - builtin name, no temp present, no IF EXISTS => FORBIDDEN_OPERATION
+      // - IF EXISTS => no-op
+      // Qualified temp namespaces (session / system.session) always target 
temp functions.
+      if (identifier.database.isEmpty &&
+          !ifExists &&
+          !catalog.isTemporaryFunction(FunctionIdentifier(funcName)) &&
           catalog.isBuiltinFunction(funcName)) {
         throw QueryCompilationErrors.cannotDropBuiltinFuncError(funcName)
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index ad6bcd414966..18b9f6b6f3b7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -211,7 +212,7 @@ class SetPathSuite extends SharedSparkSession {
         },
         condition = "DUPLICATE_SQL_PATH_ENTRY",
         sqlState = Some("42732"),
-        parameters = Map("pathEntry" -> "spark_catalog.default"))
+        parameters = Map("pathEntry" -> "current_schema"))
     }
   }
 
@@ -231,16 +232,17 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
-  test("PATH enabled: duplicate after expanding CURRENT_SCHEMA") {
+  test("PATH enabled: literal + CURRENT_SCHEMA collision is tolerated 
(USE-state dependent)") {
+    // SET PATH only rejects static duplicates (literal-vs-literal, 
current_schema repeated).
+    // A literal that happens to match the live current_schema is not flagged: 
a later
+    // `USE SCHEMA` may make them diverge, and at lookup the first match wins 
anyway.
+    // `system.builtin` is included so `current_path()` itself remains 
resolvable.
     withPathEnabled {
       sql("USE spark_catalog.default")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("SET PATH = spark_catalog.default, current_schema")
-        },
-        condition = "DUPLICATE_SQL_PATH_ENTRY",
-        sqlState = Some("42732"),
-        parameters = Map("pathEntry" -> "spark_catalog.default"))
+      sql("SET PATH = spark_catalog.default, current_schema, system.builtin")
+      val entries = pathEntries(currentPath())
+      assert(entries === Seq("spark_catalog.default", "spark_catalog.default", 
"system.builtin"),
+        s"Expected literal + resolved CURRENT_SCHEMA preserved; got: $entries")
     }
   }
 
@@ -253,7 +255,7 @@ class SetPathSuite extends SharedSparkSession {
         },
         condition = "DUPLICATE_SQL_PATH_ENTRY",
         sqlState = Some("42732"),
-        parameters = Map("pathEntry" -> "spark_catalog.default"))
+        parameters = Map("pathEntry" -> "current_schema"))
     }
   }
 
@@ -568,4 +570,243 @@ class SetPathSuite extends SharedSparkSession {
       }
     }
   }
+
+  // --- spark.sql.defaultPath (SQLConf.DEFAULT_PATH) ---
+  // The conf carries the SET PATH grammar; sessionPathEntries falls back to 
it lazily
+  // when no `SET PATH` has been issued, mirroring how `currentCatalog` falls 
back to
+  // [[SQLConf.DEFAULT_CATALOG]].
+
+  test("DEFAULT_PATH conf: lazy fallback when no SET PATH issued") {
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "true",
+        SQLConf.DEFAULT_PATH.key -> "spark_catalog.default, system.builtin") {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      catalogManager.clearSessionPath()
+      try {
+        val entries = pathEntries(currentPath())
+        assert(entries == Seq("spark_catalog.default", "system.builtin"),
+          s"Expected DEFAULT_PATH conf to drive current_path(); got: $entries")
+        assert(catalogManager.storedSessionPathEntries.isEmpty,
+          "DEFAULT_PATH lookup must not write to the in-memory stored session 
path")
+      } finally {
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
+
+  test("DEFAULT_PATH conf: explicit SET PATH overrides the conf") {
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "true",
+        SQLConf.DEFAULT_PATH.key -> "system.builtin, system.session") {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      try {
+        sql("SET PATH = system.session, system.builtin")
+        val entries = pathEntries(currentPath())
+        assert(entries == Seq("system.session", "system.builtin"),
+          s"Expected SET PATH to win over DEFAULT_PATH conf; got: $entries")
+      } finally {
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
+
+  test("DEFAULT_PATH conf: SET PATH = DEFAULT_PATH expands to the conf value") 
{
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "true",
+        SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin, 
current_schema") {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      try {
+        sql("SET PATH = DEFAULT_PATH")
+        val entries = pathEntries(currentPath())
+        assert(entries.head.contains("system.session"),
+          s"DEFAULT_PATH expansion should follow conf order (session first); 
got: $entries")
+        assert(catalogManager.storedSessionPathEntries.isDefined,
+          "After SET PATH the in-memory stored session path should be 
populated")
+      } finally {
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
+
+  test("DEFAULT_PATH conf: cycle break -- inner DEFAULT_PATH falls back to 
builtin order") {
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "true",
+        SQLConf.DEFAULT_PATH.key -> "DEFAULT_PATH",
+        // Pin order conf to "first" so the spark-builtin default ordering is 
observable.
+        SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "first") {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      catalogManager.clearSessionPath()
+      try {
+        val entries = pathEntries(currentPath())
+        assert(entries.head.contains("system.session"),
+          s"Inner DEFAULT_PATH should resolve to builtin order seeded by the 
order conf " +
+            s"('first' -> session leading); got: $entries")
+      } finally {
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
+
+  test("DEFAULT_PATH conf: invalid value rejected on SET 
spark.sql.defaultPath") {
+    withPathEnabled {
+      val e = intercept[SparkIllegalArgumentException] {
+        sql("SET spark.sql.defaultPath = this is not a path")
+      }
+      assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage)
+    }
+  }
+
+  test("DEFAULT_PATH conf: PATH keyword is rejected on SET 
spark.sql.defaultPath") {
+    withPathEnabled {
+      val e = intercept[SparkIllegalArgumentException] {
+        sql("SET spark.sql.defaultPath = PATH, system.builtin")
+      }
+      assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage)
+    }
+  }
+
+  test("DEFAULT_PATH conf: PATH disabled returns no fallback") {
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "false",
+        SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") {
+      val catalogManager = spark.sessionState.catalogManager
+      assert(catalogManager.sessionPathEntries.isEmpty,
+        "DEFAULT_PATH conf must not take effect when PATH is disabled")
+    }
+  }
+
+  // --- Path-driven security check (built on the lazy DEFAULT_PATH fallback) 
---
+  // The "block temp function shadowing builtin" check is now driven by the 
live PATH, so
+  // changes via SET PATH or DEFAULT_PATH take effect even when the legacy 
order conf is
+  // left at its default.
+
+  test("path-driven security check: SET PATH putting session before builtin 
blocks temp " +
+      "function with a builtin name") {
+    withPathEnabled {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      try {
+        // Default `sessionFunctionResolutionOrder` is "second" (builtin 
first), but SET PATH
+        // overrides that to put session first. The security check must 
reflect the live path.
+        sql("SET PATH = system.session, system.builtin")
+        val e = intercept[AnalysisException] {
+          sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1")
+        }
+        assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage)
+      } finally {
+        sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
+
+  test("path-driven security check: DEFAULT_PATH conf putting session before 
builtin " +
+      "blocks temp function with a builtin name (no SET PATH issued)") {
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "true",
+        SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      catalogManager.clearSessionPath()
+      try {
+        // Order conf is left at its default ("second"). The path-driven gate 
must read
+        // DEFAULT_PATH and fire the security check for unqualified 
temp/builtin collisions.
+        val e = intercept[AnalysisException] {
+          sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1")
+        }
+        assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage)
+      } finally {
+        sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
+
+  test("PATH enabled: SET PATH with only user schemas does not implicitly 
resolve builtins") {
+    withPathEnabled {
+      sql("CREATE SCHEMA IF NOT EXISTS only_user_on_path")
+      try {
+        sql("SET PATH = spark_catalog.only_user_on_path")
+        val e = intercept[AnalysisException] {
+          sql("SELECT abs(-1)").collect()
+        }
+        assert(e.getCondition == "UNRESOLVED_ROUTINE", e.getMessage)
+      } finally {
+        sql("DROP SCHEMA IF EXISTS only_user_on_path")
+      }
+    }
+  }
+
+  test("PATH enabled: explicit SET PATH with system.session AFTER a user 
catalog still " +
+      "reaches temp functions") {
+    // Explicit paths are honored as written: placing `system.session` after a 
user catalog
+    // is the user's authorization for unqualified temp functions to resolve. 
Contrast with
+    // the implicit (no SET PATH, no DEFAULT_PATH) form, which preserves the 
security property
+    // of the seeded default path.
+    withPathEnabled {
+      sql("CREATE SCHEMA IF NOT EXISTS path_interleaved_user")
+      try {
+        sql("CREATE TEMPORARY FUNCTION path_interleaved_temp() RETURNS INT 
RETURN 7")
+        try {
+          sql("SET PATH = system.builtin, spark_catalog.path_interleaved_user, 
system.session")
+          checkAnswer(sql("SELECT path_interleaved_temp()"), Row(7))
+        } finally {
+          sql("DROP TEMPORARY FUNCTION IF EXISTS path_interleaved_temp")
+        }
+      } finally {
+        sql("DROP SCHEMA IF EXISTS path_interleaved_user")
+      }
+    }
+  }
+
+  test("PATH enabled: SET PATH with user schema before system.builtin still 
resolves builtins") {
+    // Exercises systemFunctionKindsFromPath with a user-catalog entry 
preceding
+    // system.builtin: the helper flat-scans the path, so Builtin still appears
+    // in the kinds list and unqualified `abs` resolves.
+    withPathEnabled {
+      sql("CREATE SCHEMA IF NOT EXISTS path_user_before_builtin")
+      try {
+        sql("SET PATH = spark_catalog.path_user_before_builtin, 
system.builtin")
+        // `abs` is a builtin; if Builtin did not appear in the kinds list,
+        // unqualified `abs(-1)` would fail with UNRESOLVED_ROUTINE.
+        checkAnswer(sql("SELECT abs(-1)"), Row(1))
+      } finally {
+        sql("DROP SCHEMA IF EXISTS path_user_before_builtin")
+      }
+    }
+  }
+
+  test("DEFAULT_PATH conf: duplicate entries are tolerated (first-match 
resolution)") {
+    // Lookup uses first-match resolution, so redundant entries on 
DEFAULT_PATH are dead code
+    // rather than an error. (Contrast with SET PATH, which still rejects 
static duplicates as
+    // a user-input typo guard.) This avoids a UX cliff where a USE SCHEMA 
could later wedge
+    // every unqualified function lookup with DUPLICATE_SQL_PATH_ENTRY.
+    withSQLConf(
+        SQLConf.PATH_ENABLED.key -> "true",
+        SQLConf.DEFAULT_PATH.key -> "system.builtin, system.builtin") {
+      val catalogManager = spark.sessionState.catalogManager
+      val priorSessionPath = catalogManager.storedSessionPathEntries
+      catalogManager.clearSessionPath()
+      try {
+        val entries = pathEntries(currentPath())
+        assert(entries == Seq("system.builtin", "system.builtin"),
+          s"DEFAULT_PATH duplicates should pass through to current_path(); 
got: $entries")
+        // Sanity: unqualified resolution still works (the second 
`system.builtin` is dead).
+        checkAnswer(sql("SELECT abs(-1)"), Row(1))
+      } finally {
+        catalogManager.clearSessionPath()
+        priorSessionPath.foreach(catalogManager.setSessionPath)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to