This is an automated email from the ASF dual-hosted git repository.

yaooqinn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1659122ca401 Revert "[SPARK-56750] default path config"
1659122ca401 is described below

commit 1659122ca40126e7efc17a550850974bb2022762
Author: Kent Yao <[email protected]>
AuthorDate: Mon May 11 12:12:37 2026 +0800

    Revert "[SPARK-56750] default path config"
    
    This reverts commit 6cc6e8ad4a2a5cd4a9a2e46ab46d5ae42643eb22.
---
 .../apache-spark-scala-lint-before-commit.mdc      |  11 -
 AGENTS.md                                          |   2 -
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |   4 -
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   7 +-
 .../sql/catalyst/analysis/FunctionResolution.scala |  16 --
 .../analysis/resolver/FunctionResolver.scala       |  41 +--
 .../analysis/resolver/FunctionResolverUtils.scala  |  19 +-
 .../resolver/HigherOrderFunctionResolver.scala     |   2 +-
 .../analysis/resolver/ResolutionValidator.scala    |   6 -
 .../sql/catalyst/analysis/resolver/Resolver.scala  |  12 -
 .../sql/catalyst/catalog/SessionCatalog.scala      | 100 ++------
 .../sql/catalyst/parser/AbstractSqlParser.scala    |  13 -
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  22 +-
 .../sql/connector/catalog/CatalogManager.scala     | 111 ++------
 .../spark/sql/connector/catalog/PathElement.scala  | 141 -----------
 .../org/apache/spark/sql/internal/SQLConf.scala    |  27 --
 .../spark/sql/execution/SparkSqlParser.scala       |  10 +-
 .../command/LeafRunnableCommandResolver.scala      |  40 ---
 .../sql/execution/command/SetPathCommand.scala     |  81 +++++-
 .../spark/sql/execution/command/functions.scala    |  10 +-
 .../sql/internal/BaseSessionStateBuilder.scala     |   7 +-
 .../scala/org/apache/spark/sql/SetPathSuite.scala  | 278 +--------------------
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   2 -
 23 files changed, 145 insertions(+), 817 deletions(-)

diff --git a/.cursor/rules/apache-spark-scala-lint-before-commit.mdc 
b/.cursor/rules/apache-spark-scala-lint-before-commit.mdc
deleted file mode 100644
index f3a1d8f746e9..000000000000
--- a/.cursor/rules/apache-spark-scala-lint-before-commit.mdc
+++ /dev/null
@@ -1,11 +0,0 @@
----
-description: Run Spark Scala lint before any commit; never commit without 
passing lint-scala
-alwaysApply: true
----
-
-# Apache Spark: lint before commit
-
-- Before **any** `git commit` that changes Scala (or when preparing a commit 
batch), run `./dev/lint-scala` from the repo root and fix all Scalastyle and 
Scalafmt failures.
-- **Do not** tell the user a change is ready to commit or push until 
`./dev/lint-scala` has passed in this environment (or the user has confirmed it 
passed).
-- Common CI failures: **line length > 100** (wrap Scaladoc and code), import 
order, spacing.
-- If `./dev/lint-scala` fixes files (e.g. scalafmt), include those fixes in 
the same commit.
diff --git a/AGENTS.md b/AGENTS.md
index 3361623f9f18..96f5b7917cae 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -12,8 +12,6 @@ Before the first code edit or running test in a session, 
ensure a clean working
    - **New edits**: ask the user to choose: create a new git worktree from 
`<upstream>/master` and work from there (recommended), or create and switch to 
a new branch from `<upstream>/master`.
    - **Running tests**: use `<upstream>/master`.
 
-5. **Commits:** Before committing or pushing changes that touch Scala (or when 
unsure), run `./dev/lint-scala` from the repo root and fix all failures. Do not 
consider a change merge-ready until lint passes; CI runs the same checks.
-
 ## Development Notes
 
 SQL golden file tests are managed by `SQLQueryTestSuite` and its variants. 
Read the class documentation before running or updating these tests. DO NOT 
edit the generated golden files (`.sql.out`) directly. Always regenerate them 
when needed, and carefully review the diff to make sure it's expected.
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 89da0d4645c4..735921681cdc 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -217,10 +217,6 @@ singleTableSchema
     : colTypeList EOF
     ;
 
-singlePathElementList
-    : pathElement (COMMA pathElement)* EOF
-    ;
-
 singleRoutineParamList
     : colDefinitionList EOF
     ;
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f31354179674..95b5c54f782f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1976,15 +1976,14 @@ class Analyzer(
      * This is used for special syntax transformations (e.g., COUNT(*) -> 
COUNT(1)) that
      * should only apply to builtin functions, not to user-defined functions.
      *
-     * When the effective SQL PATH puts `system.session` before 
`system.builtin`, temp
-     * functions shadow builtins, so an unqualified name that matches a temp 
function
-     * should NOT be treated as builtin.
+     * In legacy mode (sessionOrder="first"), temp functions shadow builtins, 
so an
+     * unqualified name that matches a temp function should NOT be treated as 
builtin.
      */
     private def matchesFunctionName(nameParts: Seq[String], expectedName: 
String): Boolean = {
       if (!FunctionResolution.isUnqualifiedOrBuiltinFunctionName(nameParts, 
expectedName)) {
         return false
       }
-      if (nameParts.size == 1 && 
functionResolution.isSessionBeforeBuiltinInPath) {
+      if (nameParts.size == 1 && conf.sessionFunctionResolutionOrder == 
"first") {
         val v1Catalog = catalogManager.v1SessionCatalog
         !v1Catalog.isTemporaryFunction(FunctionIdentifier(nameParts.head))
       } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index e3dbcc4b6ef7..4721425d2cb5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -76,22 +76,6 @@ class FunctionResolution(
     nameParts.length == 3 &&
       nameParts.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME)
 
-  /**
-   * True iff `system.session` is searched before `system.builtin` in the 
effective SQL PATH.
-   *
-   * Drives the `count(*) -> count(1)` rewrite (which must skip transformation 
when a temp
-   * `count` shadows the builtin) and the `SessionCatalog` security check that 
blocks creating
-   * a temp function with a builtin's name. Reads the live PATH via 
`CatalogManager` and
-   * applies the same kinds extraction that drives `SessionCatalog`'s 
fast-path provider, so
-   * the predicate stays in sync with the lookup loop's actual order.
-   */
-  def isSessionBeforeBuiltinInPath: Boolean = {
-    val path = catalogManager.sqlResolutionPathEntries(
-      catalogManager.currentCatalog.name(), 
catalogManager.currentNamespace.toSeq)
-    CatalogManager.systemFunctionKindsFromPath(path).headOption
-      .contains(org.apache.spark.sql.catalyst.catalog.SessionCatalog.Temp)
-  }
-
   /**
    * Produces the ordered list of candidate names for resolution. Expansion 
happens in two cases:
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
index d411dffba0ea..1a8658bb764d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolver.scala
@@ -20,12 +20,10 @@ package org.apache.spark.sql.catalyst.analysis.resolver
 import scala.util.Random
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.{
   FunctionResolution,
   UnresolvedFunction,
-  UnresolvedSeed,
-  UnresolvedStar
+  UnresolvedSeed
 }
 import org.apache.spark.sql.catalyst.expressions.{
   BinaryArithmetic,
@@ -33,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions.{
   Expression,
   ExpressionWithRandomSeed,
   InheritAnalysisRules,
-  Literal,
   ResolvedCollation,
   TryEval,
   UnresolvedCollation,
@@ -63,7 +60,7 @@ import org.apache.spark.sql.catalyst.util.CollationFactory
  */
 class FunctionResolver(
     expressionResolver: ExpressionResolver,
-    protected val functionResolution: FunctionResolution,
+    functionResolution: FunctionResolution,
     aggregateExpressionResolver: AggregateExpressionResolver,
     binaryArithmeticResolver: BinaryArithmeticResolver)
     extends TreeNodeResolver[UnresolvedFunction, Expression]
@@ -120,10 +117,9 @@ class FunctionResolver(
    *  - Apply timezone, if the resulting expression is 
[[TimeZoneAwareExpression]].
    */
   override def resolve(unresolvedFunction: UnresolvedFunction): Expression = {
-    val effectiveUnresolved = 
restoreCountStarAfterParserForTempShadow(unresolvedFunction)
     val expressionInfo = functionResolution.lookupBuiltinOrTempFunction(
-      nameParts = effectiveUnresolved.nameParts,
-      unresolvedFunc = Some(effectiveUnresolved)
+      nameParts = unresolvedFunction.nameParts,
+      unresolvedFunc = Some(unresolvedFunction)
     )
     val expressionResolutionContext = expressionResolutionContextStack.peek()
 
@@ -137,7 +133,7 @@ class FunctionResolver(
 
     val functionWithResolvedChildren =
       withResolvedChildren(
-        unresolvedExpression = effectiveUnresolved,
+        unresolvedExpression = unresolvedFunction,
         resolveChild = expressionResolver.resolve _
       ).asInstanceOf[UnresolvedFunction]
 
@@ -149,33 +145,6 @@ class FunctionResolver(
     }
   }
 
-  /**
-   * SQL parsing turns `COUNT(*)` into `COUNT(1)` before analysis 
([[AstBuilder]]). When PATH
-   * orders session before builtin and a temporary `count` shadows the 
builtin, that rewrite must
-   * be undone so arguments expand from the child operator output (aligning 
with fixed-point
-   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveReferences]] / 
`matchesFunctionName`).
-   */
-  private def restoreCountStarAfterParserForTempShadow(
-      unresolvedFunction: UnresolvedFunction): UnresolvedFunction = {
-    if (unresolvedFunction.nameParts.length != 1 ||
-      unresolvedFunction.isDistinct ||
-      !unresolvedFunction.nameParts.head.equalsIgnoreCase("count")) {
-      return unresolvedFunction
-    }
-    unresolvedFunction.arguments match {
-      case Seq(lit: Literal)
-          if lit.value != null &&
-            lit.value == 1 &&
-            functionResolution.isSessionBeforeBuiltinInPath &&
-            
functionResolution.catalogManager.v1SessionCatalog.isTemporaryFunction(
-              FunctionIdentifier(unresolvedFunction.nameParts.head)) =>
-        val expanded = 
expressionResolver.expandStarExpressions(Seq(UnresolvedStar(None)))
-        unresolvedFunction.copy(arguments = expanded)
-      case _ =>
-        unresolvedFunction
-    }
-  }
-
   private def handlePartiallyResolvedFunction(partiallyResolvedFunction: 
Expression): Expression = {
     val expressionResolutionContext = expressionResolutionContextStack.peek()
     val resolvedFunction = partiallyResolvedFunction match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
index 3c5a3f1832e8..503c94fc9cdf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/FunctionResolverUtils.scala
@@ -19,9 +19,7 @@ package org.apache.spark.sql.catalyst.analysis.resolver
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.{
-  FunctionResolution,
   ResolvedStar,
   Star,
   UnresolvedFunction,
@@ -37,7 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
  */
 trait FunctionResolverUtils {
   protected def expressionResolver: ExpressionResolver
-  protected def functionResolution: FunctionResolution
   protected def conf: SQLConf
 
   private val scopes = expressionResolver.getNameScopes
@@ -102,21 +99,7 @@ trait FunctionResolverUtils {
       unresolvedFunction: UnresolvedFunction,
       normalizeFunctionName: Boolean = true
   ): Boolean = {
-    !unresolvedFunction.isDistinct &&
-      isCount(unresolvedFunction, normalizeFunctionName) &&
-      !isUnqualifiedCountShadowedByTemp(unresolvedFunction)
-  }
-
-  /**
-   * Keep single-pass behavior aligned with fixed-point: when PATH puts 
system.session before
-   * system.builtin and a temp `count` exists, unqualified `count(*)` must not 
be rewritten to
-   * `count(1)`.
-   */
-  private def isUnqualifiedCountShadowedByTemp(unresolvedFunction: 
UnresolvedFunction): Boolean = {
-    unresolvedFunction.nameParts.length == 1 &&
-      functionResolution.isSessionBeforeBuiltinInPath &&
-      functionResolution.catalogManager.v1SessionCatalog
-        
.isTemporaryFunction(FunctionIdentifier(unresolvedFunction.nameParts.head))
+    !unresolvedFunction.isDistinct && isCount(unresolvedFunction, 
normalizeFunctionName)
   }
 
   private def isCount(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
index 676ef381f2f1..6b90a5c05baf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HigherOrderFunctionResolver.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
  */
 class HigherOrderFunctionResolver(
     protected val expressionResolver: ExpressionResolver,
-    protected val functionResolution: FunctionResolution)
+    functionResolution: FunctionResolution)
     extends TreeNodeResolver[UnresolvedFunction, Expression]
     with ProducesUnresolvedSubtree
     with CoercesExpressionTypes
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
index 793df7dc44a7..6bdf2d4b0615 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolutionValidator.scala
@@ -107,9 +107,6 @@ class ResolutionValidator {
         validateRepartitionByExpression(repartitionByExpression)
       case sample: Sample =>
         validateSample(sample)
-      case deserializeToObject: DeserializeToObject =>
-        validate(deserializeToObject.child)
-        handleOperatorOutput(deserializeToObject)
       case generate: Generate =>
         validateGenerate(generate)
       case expand: Expand =>
@@ -121,9 +118,6 @@ class ResolutionValidator {
         validateRelation(multiInstanceRelation)
       case supervisingCommand: SupervisingCommand =>
         validateSupervisingCommand(supervisingCommand)
-      case cmd: Command if cmd.children.isEmpty =>
-        // Session side-effect commands (e.g. SET PATH) are leaves with no 
query subtree.
-        ()
     }
 
     operator match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
index a811f5ce6d88..aaf7117ef4e8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/Resolver.scala
@@ -136,15 +136,6 @@ class Resolver(
    */
   private val planRewriter = new PlanRewriter(planRewriteRules, 
extendedRewriteRules)
 
-  /**
-   * Runs session post-resolution rules (see [[extendedRewriteRules]]) on a 
subtree. Used for
-   * operators such as [[DeserializeToObject]] whose deserializer is resolved 
by the same rules as
-   * the end-of-resolution [[PlanRewriter]] batch, but which must be applied 
before single-pass
-   * validation asserts the operator is fully resolved.
-   */
-  private def applyExtendedRewriteRulesLocally(plan: LogicalPlan): LogicalPlan 
=
-    extendedRewriteRules.foldLeft(plan) { case (p, rule) => rule(p) }
-
   /**
    * [[relationMetadataProvider]] is used to resolve metadata for relations. 
It's initialized with
    * the default implementation [[MetadataResolver]] here and is called in
@@ -334,9 +325,6 @@ class Resolver(
             resolveRepartition(repartition)
           case sample: Sample =>
             resolveSample(sample)
-          case deserializeToObject: DeserializeToObject =>
-            applyExtendedRewriteRulesLocally(
-              deserializeToObject.copy(child = 
resolve(deserializeToObject.child)))
           case _ =>
             tryDelegateResolutionToExtension(unresolvedPlan).getOrElse {
               handleUnmatchedOperator(unresolvedPlan)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 249700ec0d92..32aa8cccbd93 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -113,66 +113,17 @@ class SessionCatalog(
     identifier.copy(funcName = "") == SESSION_NAMESPACE_TEMPLATE
 
   /**
-   * When set, unqualified builtin/temp function resolution uses this fixed 
kind order instead of
-   * [[catalogManagerForSessionFunctionKinds]] / [[SQLConf.systemPathOrder]]. 
For unit tests only;
-   * production relies on the catalog manager binding.
+   * Session function kinds in resolution order for unqualified lookups.
+   * Matches [[SQLConf.sessionFunctionResolutionOrder]]: "first" (session 
first),
+   * "second" (default), "last" (builtin only; session tried after persistent).
    */
-  @volatile private var sessionFunctionKindsTestOverride: 
Option[Seq[SessionFunctionKind]] = None
-
-  /**
-   * Live PATH for session function kinds. Set from
-   * [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor 
via
-   * [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and 
the security check
-   * that blocks temp functions from shadowing builtins read the effective SQL 
PATH (post-`SET
-   * PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]] 
fallbacks already
-   * applied).
-   *
-   * When unset (e.g. standalone [[SessionCatalog]] in tests), kinds derive 
from
-   * [[SQLConf.systemPathOrder]] -- the seeded default path -- without 
assuming other legacy
-   * resolution-order conf beyond seeding `defaultPathOrder`.
-   */
-  @volatile private var catalogManagerForSessionFunctionKinds: 
Option[CatalogManager] = None
-
-  /**
-   * Wire live PATH-derived session function kinds from the session 
[[CatalogManager]].
-   * Called once from 
[[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor.
-   */
-  private[sql] def bindCatalogManagerForSessionFunctionKinds(cm: 
CatalogManager): Unit = {
-    catalogManagerForSessionFunctionKinds = Some(cm)
-  }
-
-  /**
-   * Pin session function kinds for tests (`None` clears). Uses `private[sql]` 
so tests under the
-   * `org.apache.spark.sql` package can control ordering without a public 
catalog API.
-   */
-  private[sql] def setSessionFunctionKindsTestOverride(
-      kinds: Option[Seq[SessionFunctionKind]]): Unit = {
-    sessionFunctionKindsTestOverride = kinds
-  }
-
-  /**
-   * Session function kinds in resolution order for unqualified lookups: test 
override if set,
-   * else live PATH from [[catalogManagerForSessionFunctionKinds]], else
-   * [[SQLConf.systemPathOrder]].
-   */
-  private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] =
-    sessionFunctionKindsTestOverride.getOrElse {
-      catalogManagerForSessionFunctionKinds match {
-        case Some(cm) =>
-          CatalogManager.systemFunctionKindsFromPath(
-            cm.sqlResolutionPathEntries(cm.currentCatalog.name(), 
cm.currentNamespace.toSeq))
-        case None =>
-          CatalogManager.systemFunctionKindsFromPath(conf.systemPathOrder)
-      }
+  private def sessionFunctionKindsInResolutionOrder: Seq[SessionFunctionKind] 
= {
+    conf.sessionFunctionResolutionOrder match {
+      case "first" => Seq(Temp, Builtin)
+      case "last" => Seq(Builtin)
+      case _ => Seq(Builtin, Temp) // "second" (default)
     }
-
-  /**
-   * True iff the effective SQL PATH searches `system.session` before 
`system.builtin`. Used
-   * to gate the security check that blocks temporary functions from silently 
shadowing a
-   * builtin of the same name.
-   */
-  private def sessionFirstInPath: Boolean =
-    sessionFunctionKindsInResolutionOrder.headOption.contains(Temp)
+  }
 
   /**
    * Checks if a namespace represents temporary functions.
@@ -2129,11 +2080,12 @@ class SessionCatalog(
       qualifyIdentifier(func)
     }
 
-    // Security check: when the effective SQL PATH searches `system.session` 
before
-    // `system.builtin`, block creating an unqualified temporary function 
whose name
-    // collides with a builtin so it cannot silently shadow that builtin via 
unqualified
-    // resolution. We throw ROUTINE_ALREADY_EXISTS to indicate the conflict.
-    if (func.database.isEmpty && sessionFirstInPath && !overrideIfExists) {
+    // Security check: When legacy mode is enabled, block SQL-created 
temporary functions
+    // from shadowing builtin functions (to preserve master behavior)
+    // Scala UDFs are still allowed to shadow in legacy mode
+    // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function 
already exists
+    val sessionFirst = conf.sessionFunctionResolutionOrder == "first"
+    if (func.database.isEmpty && sessionFirst && !overrideIfExists) {
       val funcName = func.funcName
       // Check if function exists in builtin namespace (extensions are stored 
as builtins)
       val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName)
@@ -2243,11 +2195,10 @@ class SessionCatalog(
       // Use FunctionIdentifier with session namespace for temporary functions
       val tempIdentifier = tempFunctionIdentifier(function.name.funcName)
 
-      // Security check: when the effective SQL PATH searches `system.session` 
before
-      // `system.builtin`, block creating an unqualified temporary function 
whose name
-      // collides with a builtin (including extensions) so it cannot silently 
shadow that
-      // builtin via unqualified resolution.
-      if (sessionFirstInPath && !overrideIfExists) {
+      // Security check: When legacy mode is enabled, block SQL-created 
temporary functions
+      // from shadowing builtin functions (including extensions) as a safeguard
+      // We throw ROUTINE_ALREADY_EXISTS to indicate the builtin function 
already exists
+      if ((conf.sessionFunctionResolutionOrder == "first") && 
!overrideIfExists) {
         val funcName = function.name.funcName
         // Check if function exists in builtin namespace (extensions are 
stored as builtins)
         val builtinIdent = FunctionRegistry.builtinFunctionIdentifier(funcName)
@@ -2548,12 +2499,7 @@ class SessionCatalog(
    * Look up the `ExpressionInfo` of the given function by name.
    * Resolution order follows the configured path (e.g. builtin then session).
    */
-  def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = 
{
-    // Intentionally not `synchronized` on this [[SessionCatalog]]. Resolution 
order may call
-    // into [[CatalogManager]] (e.g. 
[[CatalogManager.sqlResolutionPathEntries]]), which can
-    // synchronize on the manager; another
-    // thread can hold that lock and call into this catalog (e.g. via 
`setCurrentNamespace`),
-    // which would deadlock if this method also synchronized on `this`.
+  def lookupBuiltinOrTempTableFunction(name: String): Option[ExpressionInfo] = 
synchronized {
     lookupFunctionWithShadowing(name, tableFunctionRegistry, 
checkBuiltinOperators = false)
   }
 
@@ -2704,11 +2650,7 @@ class SessionCatalog(
   /**
    * Look up the [[ExpressionInfo]] associated with the specified function, 
assuming it exists.
    */
-  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = {
-    // Intentionally not `synchronized` on this [[SessionCatalog]] (see
-    // [[lookupBuiltinOrTempTableFunction]]): unqualified builtin/temp 
resolution uses
-    // [[sessionFunctionKindsInResolutionOrder]] / [[CatalogManager]] and must 
not run under
-    // this catalog's intrinsic lock.
+  def lookupFunctionInfo(name: FunctionIdentifier): ExpressionInfo = 
synchronized {
     if (name.database.isEmpty) {
       lookupBuiltinOrTempFunction(name.funcName)
         .orElse(lookupBuiltinOrTempTableFunction(name.funcName))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
index 29bf924f244e..216136d8a7c8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AbstractSqlParser.scala
@@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
 import org.apache.spark.sql.catalyst.plans.logical.{CompoundPlanStatement, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.sql.connector.catalog.PathElement
 import org.apache.spark.sql.errors.QueryParsingErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -111,18 +110,6 @@ abstract class AbstractSqlParser extends AbstractParser 
with ParserInterface {
     }
   }
 
-  /**
-   * Parse the right-hand side of `SET PATH = ...` (a comma-separated list of 
path elements).
-   * Used by [[org.apache.spark.sql.connector.catalog.CatalogManager]] to 
honor the
-   * [[SQLConf.DEFAULT_PATH]] conf without re-implementing the SET PATH 
grammar.
-   */
-  private[sql] def parsePathElements(sqlText: String): Seq[PathElement] = 
parse(sqlText) { parser =>
-    val ctx = parser.singlePathElementList()
-    withErrorHandling(ctx, Some(sqlText)) {
-      astBuilder.visitSinglePathElementList(ctx)
-    }
-  }
-
   def withErrorHandling[T](ctx: ParserRuleContext, sqlText: 
Option[String])(toResult: => T): T = {
     withOrigin(ctx, sqlText) {
       try {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index da232a99f19b..12446b7cc9d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -47,7 +47,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.PARAMETER
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, CollationFactory, 
DateTimeUtils, EvaluateUnresolvedInlineTable, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, 
stringToTime, stringToTimestamp, stringToTimestampWithoutTimeZone}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, 
PathElement, SupportsNamespaces, TableCatalog, TableWritePrivilege}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, ChangelogInfo, 
SupportsNamespaces, TableCatalog, TableWritePrivilege}
 import org.apache.spark.sql.connector.catalog.ChangelogRange.{TimestampRange, 
UnboundedRange, VersionRange}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
 import org.apache.spark.sql.connector.expressions.{ApplyTransform, 
BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, 
HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, 
YearsTransform}
@@ -708,26 +708,6 @@ class AstBuilder extends DataTypeAstBuilder
     visitMultipartIdentifier(ctx.multipartIdentifier)
   }
 
-  override def visitSinglePathElementList(
-      ctx: SinglePathElementListContext): Seq[PathElement] = withOrigin(ctx) {
-    ctx.pathElement().asScala.map(visitPathElement).toSeq
-  }
-
-  override def visitPathElement(ctx: PathElementContext): PathElement = 
withOrigin(ctx) {
-    if (ctx.DEFAULT_PATH() != null) PathElement.DefaultPath
-    else if (ctx.SYSTEM_PATH() != null) PathElement.SystemPath
-    else if (ctx.PATH() != null) PathElement.PathRef
-    else if (ctx.CURRENT_DATABASE() != null || ctx.CURRENT_SCHEMA() != null) {
-      PathElement.CurrentSchema
-    } else {
-      val parts = visitMultipartIdentifier(ctx.multipartIdentifier())
-      if (parts.length < 2) {
-        throw 
QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString("."))
-      }
-      PathElement.SchemaInPath(parts)
-    }
-  }
-
   override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = 
withOrigin(ctx) {
     typedVisit[DataType](ctx.dataType)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 5f4c71297ac5..f73b0e68cecc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.connector.catalog
 
-import java.util.concurrent.atomic.AtomicReference
-
 import scala.collection.mutable
 import scala.util.Try
 
@@ -26,7 +24,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
TempVariableManager}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.connector.catalog.transactions.Transaction
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -55,12 +52,6 @@ class CatalogManager(
   // TODO: create a real SYSTEM catalog to host `TempVariableManager` under 
the SESSION namespace.
   val tempVariableManager: TempVariableManager = new TempVariableManager
 
-  // Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds 
list itself is
-  // pure data conversion (system entries from the path, in path order); the 
*decision* to use
-  // path-order kinds for unqualified lookups lives at the Strategy layer (see 
callers of
-  // [[CatalogManager.systemFunctionKindsFromPath]]).
-  v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)
-
   def catalog(name: String): CatalogPlugin = synchronized {
     if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
       v2SessionCatalog
@@ -147,60 +138,8 @@ class CatalogManager(
 
   private var _sessionPath: Option[Seq[SessionPathEntry]] = None
 
-  /**
-   * Cache for [[confDefaultPathEntries]]: stores the expanded 
[[SessionPathEntry]] list keyed
-   * on the trimmed [[SQLConf.DEFAULT_PATH]] string and
-   * [[SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER]] value (the only conf that 
affects the
-   * expansion of `DEFAULT_PATH` / `SYSTEM_PATH` tokens).
-   * `CurrentSchemaEntry` markers are preserved unresolved so the cache stays 
valid across
-   * `USE SCHEMA`.
-   */
-  private val confDefaultPathCache =
-    new AtomicReference[Option[(String, String, Seq[SessionPathEntry])]](None)
-
-  /**
-   * Returns the effective session path entries: the explicit `SET PATH` value 
if stored,
-   * else the parsed [[SQLConf.DEFAULT_PATH]] conf if non-empty (mirroring how
-   * [[currentCatalog]] falls back to [[SQLConf.DEFAULT_CATALOG]]). Returns 
`None` when
-   * [[SQLConf.PATH_ENABLED]] is false or both sources are empty.
-   */
-  def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
-    if (!conf.pathEnabled) None
-    else _sessionPath.orElse(confDefaultPathEntries)
-  }
-
-  /** Raw `_sessionPath` (post-`SET PATH`), without the 
[[SQLConf.DEFAULT_PATH]] fallback. */
-  def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { 
_sessionPath }
-
-  /**
-   * Parsed and expanded [[SQLConf.DEFAULT_PATH]] value, or `None` when the 
conf is empty.
-   * Reuses the SET PATH grammar via [[CatalystSqlParser.parsePathElements]]. 
An inner
-   * `DEFAULT_PATH` token resolves to the spark-builtin default ordering 
(cycle break).
-   *
-   * Unlike `SET PATH`, this does NOT run a duplicate check: lookup uses 
first-match
-   * resolution, so any redundant entry (including ones that only collide 
after a later
-   * `USE SCHEMA`) is dead code rather than an error. Cached so the hot path 
is a single
-   * atomic load on conf-stable sessions.
-   */
-  def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
-    val confValue = conf.defaultPath
-    if (confValue == null || confValue.trim.isEmpty) {
-      confDefaultPathCache.set(None)
-      None
-    } else {
-      val trimmed = confValue.trim
-      val sessionOrder = conf.sessionFunctionResolutionOrder
-      val expanded = confDefaultPathCache.get() match {
-        case Some((k, ord, cached)) if k == trimmed && ord == sessionOrder => 
cached
-        case _ =>
-          val elements = CatalystSqlParser.parsePathElements(trimmed)
-          val computed = PathElement.expand(elements, conf, this, 
isConfDefaultExpansion = true)
-          confDefaultPathCache.set(Some((trimmed, sessionOrder, computed)))
-          computed
-      }
-      if (expanded.isEmpty) None else Some(expanded)
-    }
-  }
+  /** Returns the raw stored session path entries, or None if no path is set. 
*/
+  def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { 
_sessionPath }
 
   def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
     _sessionPath = Some(entries)
@@ -211,18 +150,18 @@ class CatalogManager(
   }
 
   private[sql] def copySessionPathFrom(other: CatalogManager): Unit = 
synchronized {
-    _sessionPath = other.storedSessionPathEntries
+    _sessionPath = other.sessionPathEntries
   }
 
   /**
    * String form of the current resolution path for CURRENT_PATH().
-   * When PATH is enabled and a session path is in effect (stored or via
-   * [[SQLConf.DEFAULT_PATH]]), formats the resolved entries. Otherwise falls 
back to the legacy
-   * resolutionSearchPath.
+   * When PATH is enabled and a session path is stored, formats the effective 
path entries
+   * with markers expanded. Otherwise falls back to the legacy 
resolutionSearchPath.
    */
   def currentPathString: String = synchronized {
     import CatalogV2Implicits._
-    sessionPathEntries match {
+    val stored = if (conf.pathEnabled) _sessionPath else None
+    stored match {
       case Some(entries) =>
         val resolved = CatalogManager.resolvePathEntries(
           entries, currentCatalog.name(), currentNamespace.toSeq)
@@ -236,8 +175,7 @@ class CatalogManager(
   /**
    * Ordered catalog/schema path entries for resolving unqualified SQL object 
names.
    * When PATH is off or unset, applies [[SQLConf.defaultPathOrder]] (legacy).
-   * When PATH is in effect (stored or via the [[SQLConf.DEFAULT_PATH]] conf), 
uses the
-   * resolved entries.
+   * When PATH is explicitly set, uses the resolved stored path entries.
    */
   def sqlResolutionPathEntries(
       pathDefaultCatalog: String,
@@ -247,7 +185,8 @@ class CatalogManager(
     val defaultEntry =
       if (pathDefaultNamespace.isEmpty) Seq(pathDefaultCatalog)
       else pathDefaultCatalog +: pathDefaultNamespace
-    sessionPathEntries match {
+    val stored = if (conf.pathEnabled) _sessionPath else None
+    stored match {
       case Some(entries) =>
         CatalogManager.resolvePathEntries(entries, expandCatalog, 
expandNamespace)
       case None =>
@@ -270,11 +209,11 @@ class CatalogManager(
    * [[org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog]] / 
`lookupBuiltinOrTempFunction`,
    * not loadable via [[catalog]]), so `currentCatalog.name()` cannot be 
`"system"`. If that
    * invariant ever changes, this short-circuit must be revisited.
-   * Inspecting effective entries directly avoids loading the configured 
default catalog.
+   * Inspecting stored entries directly avoids loading the configured default 
catalog.
    */
   def isSystemSessionOnPath: Boolean = synchronized {
     if (!conf.pathEnabled) return true
-    sessionPathEntries match {
+    _sessionPath match {
       case None => true
       case Some(entries) => entries.exists {
         case CatalogManager.LiteralPathEntry(parts) =>
@@ -350,7 +289,6 @@ class CatalogManager(
     _currentNamespace = None
     _currentCatalogName = None
     _sessionPath = None
-    confDefaultPathCache.set(None)
     v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
   }
 }
@@ -402,30 +340,9 @@ private[sql] object CatalogManager extends Logging {
       isFullyQualifiedSystemSessionViewName(nameParts)
   }
 
-  /** True if a SQL path entry is the well-known `system.session` entry 
(case-insensitive). */
+  /** True if a SQL path entry is the well-known `system.session` entry. */
   def isSystemSessionPathEntry(parts: Seq[String]): Boolean =
-    parts.length == 2 &&
-      parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
-      parts(1).equalsIgnoreCase(SESSION_NAMESPACE)
-
-  /** True if a SQL path entry is the well-known `system.builtin` entry 
(case-insensitive). */
-  def isSystemBuiltinPathEntry(parts: Seq[String]): Boolean =
-    parts.length == 2 &&
-      parts.head.equalsIgnoreCase(SYSTEM_CATALOG_NAME) &&
-      parts(1).equalsIgnoreCase(BUILTIN_NAMESPACE)
-
-  /**
-   * Extract `system.builtin` / `system.session` entries from a resolved PATH, 
mapped to
-   * [[SessionCatalog.SessionFunctionKind]] in path order. Pure data 
conversion -- callers
-   * decide whether and how to use this list.
-   */
-  def systemFunctionKindsFromPath(
-      path: Seq[Seq[String]]): Seq[SessionCatalog.SessionFunctionKind] =
-    path.flatMap { e =>
-      if (isSystemBuiltinPathEntry(e)) Some(SessionCatalog.Builtin)
-      else if (isSystemSessionPathEntry(e)) Some(SessionCatalog.Temp)
-      else None
-    }
+    parts == Seq(SYSTEM_CATALOG_NAME, SESSION_NAMESPACE)
 
   /**
    * A single entry in the session SQL path: either a literal schema
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
deleted file mode 100644
index ad5cf0e161ea..000000000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/PathElement.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector.catalog
-
-import java.util.Locale
-
-import scala.collection.mutable
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.connector.catalog.CatalogManager.{
-  CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry
-}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * One element on the right-hand side of `SET PATH = ...`: either a well-known 
shortcut
- * keyword (DEFAULT_PATH, SYSTEM_PATH, PATH, CURRENT_SCHEMA / 
CURRENT_DATABASE) or a
- * fully qualified schema reference (`catalog.namespace...` with at least 2 
parts).
- *
- * The same grammar is reused to parse the [[SQLConf.DEFAULT_PATH]] conf 
value, so this
- * AST node lives in catalyst beside [[CatalogManager]] rather than in the 
runtime
- * [[org.apache.spark.sql.execution.command.SetPathCommand]].
- */
-private[sql] sealed trait PathElement
-
-private[sql] object PathElement {
-  case object DefaultPath extends PathElement
-  case object SystemPath extends PathElement
-  case object PathRef extends PathElement
-
-  /**
-   * Current database/schema (SQL aliases). Stored as the 
[[CurrentSchemaEntry]] marker
-   * so resolution candidates expand against the live `USE SCHEMA`.
-   */
-  case object CurrentSchema extends PathElement
-
-  /** Fully qualified schema reference (`catalog.namespace...`). Must have at 
least 2 parts. */
-  case class SchemaInPath(parts: Seq[String]) extends PathElement
-
-  /**
-   * Expand a parsed [[PathElement]] list into concrete [[SessionPathEntry]] 
entries
-   * suitable for storing in [[CatalogManager._sessionPath]] or returning from
-   * [[CatalogManager.sessionPathEntries]].
-   *
-   * @param isConfDefaultExpansion when true, an inner [[DefaultPath]] token 
resolves
-   *                               to the spark-builtin default ordering 
(cycle break)
-   *                               rather than reading 
[[SQLConf.DEFAULT_PATH]] again.
-   *                               Set to true when this method is invoked 
while
-   *                               parsing [[SQLConf.DEFAULT_PATH]] itself.
-   */
-  def expand(
-      elements: Seq[PathElement],
-      conf: SQLConf,
-      catalogManager: CatalogManager,
-      isConfDefaultExpansion: Boolean = false): Seq[SessionPathEntry] = {
-    val currentSchemaSentinel = Seq("__current_schema__")
-
-    def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map {
-      case p if p == currentSchemaSentinel => CurrentSchemaEntry
-      case p => LiteralPathEntry(p)
-    }
-
-    def builtinDefaultWithCurrentSchema: Seq[SessionPathEntry] =
-      toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel)))
-
-    def defaultPathExpansion: Seq[SessionPathEntry] = {
-      if (isConfDefaultExpansion) {
-        // Cycle break: inner DEFAULT_PATH inside the conf default value falls 
back to the
-        // spark-builtin default ordering instead of recursing.
-        builtinDefaultWithCurrentSchema
-      } else {
-        
catalogManager.confDefaultPathEntries.getOrElse(builtinDefaultWithCurrentSchema)
-      }
-    }
-
-    elements.flatMap {
-      case DefaultPath =>
-        defaultPathExpansion
-      case SystemPath =>
-        toEntries(conf.systemPathOrder)
-      case CurrentSchema =>
-        Seq(CurrentSchemaEntry)
-      case PathRef =>
-        catalogManager.storedSessionPathEntries.getOrElse(defaultPathExpansion)
-      case SchemaInPath(parts) =>
-        Seq(LiteralPathEntry(parts))
-    }
-  }
-
-  /**
-   * Reject *static* duplicates in a SET PATH entry list: identical 
[[LiteralPathEntry]] parts
-   * and repeated [[CurrentSchemaEntry]] markers (the `current_schema` / 
`current_database`
-   * cross-alias case). Used for the interactive `SET PATH` form to surface 
user typos at
-   * statement time.
-   *
-   * Deliberately does NOT compare a [[LiteralPathEntry]] against a 
[[CurrentSchemaEntry]]:
-   * such a "duplicate" depends on the live `USE SCHEMA` and is harmless at 
lookup (first-match
-   * resolution skips the dead literal). [[SQLConf.DEFAULT_PATH]] expansion 
skips this check
-   * entirely so transient `USE`-induced collisions don't wedge unqualified 
resolution.
-   */
-  def validateNoStaticDuplicates(
-      entries: Seq[SessionPathEntry],
-      caseSensitive: Boolean): Seq[SessionPathEntry] = {
-    val seenLiterals = new mutable.HashSet[Seq[String]]
-    var seenCurrentSchema = false
-    entries.foreach {
-      case CurrentSchemaEntry =>
-        if (seenCurrentSchema) {
-          throw new AnalysisException(
-            errorClass = "DUPLICATE_SQL_PATH_ENTRY",
-            messageParameters = Map("pathEntry" -> "current_schema"))
-        }
-        seenCurrentSchema = true
-      case LiteralPathEntry(parts) =>
-        val key = if (caseSensitive) parts else 
parts.map(_.toLowerCase(Locale.ROOT))
-        if (!seenLiterals.add(key)) {
-          throw new AnalysisException(
-            errorClass = "DUPLICATE_SQL_PATH_ENTRY",
-            messageParameters = Map(
-              "pathEntry" ->
-                parts.map(p => if (p.contains(".")) s"`$p`" else 
p).mkString(".")))
-        }
-    }
-    entries
-  }
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f9144378c1f3..46574b2f489b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -43,11 +43,9 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.connector.catalog.PathElement.PathRef
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.types.{AtomicType, TimestampNTZType, TimestampType}
 import org.apache.spark.storage.{StorageLevel, StorageLevelMapper}
@@ -2473,29 +2471,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
-  val DEFAULT_PATH =
-    buildConf("spark.sql.defaultPath")
-      .version("4.2.0")
-      .doc("Default SQL PATH used when no SET PATH has been issued in the 
session; this is " +
-        "also the value to which `SET PATH = DEFAULT_PATH` expands. Accepts 
the full SET PATH " +
-        "grammar; an inner DEFAULT_PATH token resolves to the spark-builtin 
default ordering. " +
-        "The PATH keyword is not allowed in this conf value. " +
-        "When empty, the spark-builtin default ordering controlled by " +
-        "`spark.sql.functionResolution.sessionOrder` applies. Validated for 
syntax at set time; " +
-        "redundant entries are tolerated (lookup uses first-match resolution). 
The interactive " +
-        "SET PATH form still rejects static duplicates as a typo guard.")
-      .withBindingPolicy(ConfigBindingPolicy.SESSION)
-      .stringConf
-      .checkValue(
-        v =>
-          v == null || v.trim.isEmpty ||
-            Try(CatalystSqlParser.parsePathElements(v.trim))
-              .toOption
-              .exists(!_.contains(PathRef)),
-        "The value must be empty or a comma-separated SET PATH element list " +
-          "(same grammar as SET PATH, except PATH is not allowed).")
-      .createWithDefault("")
-
   // Whether to retain group by columns or not in GroupedData.agg.
   val DATAFRAME_RETAIN_GROUP_COLUMNS = 
buildConf("spark.sql.retainGroupColumns")
     .version("1.4.0")
@@ -8580,8 +8555,6 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def pathEnabled: Boolean = getConf(SQLConf.PATH_ENABLED)
 
-  def defaultPath: String = getConf(SQLConf.DEFAULT_PATH)
-
   /**
    * Returns the resolution search path for error messages and resolution 
order.
    * This is the single source of truth for the search path used for 
functions, tables, and views.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index b4ece7329094..8f4c77840f0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -359,7 +359,15 @@ class SparkSqlAstBuilder extends AstBuilder {
    * }}}
    */
   override def visitSetPath(ctx: SetPathContext): LogicalPlan = 
withOrigin(ctx) {
-    SetPathCommand(ctx.pathElement().asScala.map(visitPathElement).toSeq)
+    val elements = ctx.pathElement().asScala.map { pe =>
+      if (pe.DEFAULT_PATH() != null) PathElement.DefaultPath
+      else if (pe.SYSTEM_PATH() != null) PathElement.SystemPath
+      else if (pe.PATH() != null) PathElement.PathRef
+      else if (pe.CURRENT_DATABASE() != null) PathElement.CurrentDatabase
+      else if (pe.CURRENT_SCHEMA() != null) PathElement.CurrentSchema
+      else 
PathElement.SchemaInPath(visitMultipartIdentifier(pe.multipartIdentifier()))
+    }.toSeq
+    SetPathCommand(elements)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/LeafRunnableCommandResolver.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/LeafRunnableCommandResolver.scala
deleted file mode 100644
index 07ac4afd432b..000000000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/LeafRunnableCommandResolver.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import org.apache.spark.sql.catalyst.analysis.resolver.{
-  LogicalPlanResolver,
-  ResolverExtension
-}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-/**
- * [[ResolverExtension]] so the single-pass analyzer accepts 
[[LeafRunnableCommand]] plans.
- * These commands have no logical-plan children ([[RunnableCommand]] reports 
an empty child
- * list); nested SQL or expressions are parsed and analyzed inside 
[[RunnableCommand.run]].
- */
-private[sql] class LeafRunnableCommandResolver extends ResolverExtension {
-  override def resolveOperator(
-      operator: LogicalPlan,
-      resolver: LogicalPlanResolver): Option[LogicalPlan] = operator match {
-    case cmd: LeafRunnableCommand =>
-      Some(cmd)
-    case _ =>
-      None
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
index 82ab46ec9b14..70538160eefd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetPathCommand.scala
@@ -17,17 +17,40 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.util.Locale
+
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.PathElement
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.CatalogManager.{
+  CurrentSchemaEntry, LiteralPathEntry, SessionPathEntry
+}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 
+/**
+ * Path element for SET PATH: either a well-known shortcut or a fully 
qualified schema reference.
+ * SchemaInPath requires at least 2 parts (catalog.namespace); multi-level 
namespaces are allowed.
+ */
+sealed trait PathElement
+
+object PathElement {
+  case object DefaultPath extends PathElement
+  case object SystemPath extends PathElement
+  case object PathRef extends PathElement
+  /**
+   * Current database/schema (SQL aliases). Stored as system.current_schema; 
expands when
+   * building resolution candidates so later USE SCHEMA is reflected.
+   */
+  case object CurrentDatabase extends PathElement
+  case object CurrentSchema extends PathElement
+  /** Fully qualified schema reference (catalog.namespace...). Must have at 
least 2 parts. */
+  case class SchemaInPath(parts: Seq[String]) extends PathElement
+}
+
 /**
  * Command for SET PATH = pathElement (, pathElement)*
  * Expands shortcuts at run time, validates no duplicates, and sets the 
internal session path.
- *
- * The [[PathElement]] AST and its expansion live in catalyst so that the same 
grammar can be
- * reused to parse the [[SQLConf.DEFAULT_PATH]] conf value.
  */
 case class SetPathCommand(elements: Seq[PathElement]) extends 
LeafRunnableCommand {
 
@@ -41,9 +64,23 @@ case class SetPathCommand(elements: Seq[PathElement]) 
extends LeafRunnableComman
     }
     val conf = sparkSession.sessionState.conf
     val catalogManager = sparkSession.sessionState.catalogManager
+    val currentCatalog = catalogManager.currentCatalog.name
+    val currentNamespace = catalogManager.currentNamespace.toSeq
+    val caseSensitive = conf.caseSensitiveAnalysis
 
-    val expanded0 = PathElement.expand(elements, conf, catalogManager)
-    val expanded = PathElement.validateNoStaticDuplicates(expanded0, 
conf.caseSensitiveAnalysis)
+    val expanded = expandPathElements(elements, conf, catalogManager)
+    val seen = new scala.collection.mutable.HashSet[Seq[String]]
+    expanded.foreach { entry =>
+      val concrete = entry.resolve(currentCatalog, currentNamespace)
+      def normalize(s: String): String = if (caseSensitive) s else 
s.toLowerCase(Locale.ROOT)
+      val key = concrete.map(normalize)
+      if (!seen.add(key)) {
+        throw new AnalysisException(
+          errorClass = "DUPLICATE_SQL_PATH_ENTRY",
+          messageParameters = Map("pathEntry" ->
+            concrete.map(p => if (p.contains(".")) s"`$p`" else 
p).mkString(".")))
+      }
+    }
 
     if (expanded.isEmpty) {
       catalogManager.clearSessionPath()
@@ -52,4 +89,36 @@ case class SetPathCommand(elements: Seq[PathElement]) 
extends LeafRunnableComman
     }
     Seq.empty
   }
+
+  private def expandPathElements(
+      elements: Seq[PathElement],
+      conf: SQLConf,
+      catalogManager: CatalogManager): Seq[SessionPathEntry] = {
+    val currentSchemaSentinel = Seq("__current_schema__")
+
+    def toEntries(parts: Seq[Seq[String]]): Seq[SessionPathEntry] = parts.map {
+      case p if p == currentSchemaSentinel => CurrentSchemaEntry
+      case p => LiteralPathEntry(p)
+    }
+
+    def defaultWithCurrentSchema: Seq[SessionPathEntry] =
+      toEntries(conf.defaultPathOrder(Seq(currentSchemaSentinel)))
+
+    elements.flatMap {
+      case PathElement.DefaultPath =>
+        defaultWithCurrentSchema
+      case PathElement.SystemPath =>
+        toEntries(conf.systemPathOrder)
+      case PathElement.CurrentDatabase | PathElement.CurrentSchema =>
+        Seq(CurrentSchemaEntry)
+      case PathElement.PathRef =>
+        catalogManager.sessionPathEntries.getOrElse(defaultWithCurrentSchema)
+      case PathElement.SchemaInPath(parts) =>
+        if (parts.length < 2) {
+          throw 
QueryCompilationErrors.invalidSqlPathSchemaReferenceError(parts.mkString("."))
+        }
+        Seq(LiteralPathEntry(parts))
+    }
+  }
+
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index ff54b49eed3a..5929e5c56f90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -167,13 +167,9 @@ case class DropFunctionCommand(
         identifier.funcName
       }
 
-      // Keep DROP TEMPORARY FUNCTION semantics consistent for unqualified 
names:
-      // - builtin name, no temp present, no IF EXISTS => FORBIDDEN_OPERATION
-      // - IF EXISTS => no-op
-      // Qualified temp namespaces (session / system.session) always target 
temp functions.
-      if (identifier.database.isEmpty &&
-          !ifExists &&
-          !catalog.isTemporaryFunction(FunctionIdentifier(funcName)) &&
+      // Check if temp function exists first - if it does, allow dropping it 
even if a builtin
+      // with the same name exists (shadowing case)
+      if (!catalog.isTemporaryFunction(FunctionIdentifier(funcName)) &&
           catalog.isBuiltinFunction(funcName)) {
         throw QueryCompilationErrors.cannotDropBuiltinFuncError(funcName)
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index d2bac612c774..cc8c9dcb71f5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, 
CommandExecutionMode, Query
 import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder
 import org.apache.spark.sql.execution.aggregate.{ResolveEncodersInScalaAgg, 
ScalaUDAF}
 import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
-import org.apache.spark.sql.execution.command.{CheckViewReferences, 
CommandCheck, LeafRunnableCommandResolver}
+import org.apache.spark.sql.execution.command.{CheckViewReferences, 
CommandCheck}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, 
V2SessionCatalog}
 import org.apache.spark.sql.execution.streaming.runtime.ResolveWriteToStream
@@ -194,8 +194,7 @@ abstract class BaseSessionStateBuilder(
       customHintResolutionRules
 
     override val singlePassResolverExtensions: Seq[ResolverExtension] = Seq(
-      new LogicalRelationResolver,
-      new LeafRunnableCommandResolver
+      new LogicalRelationResolver
     )
 
     override val singlePassMetadataResolverExtensions: Seq[ResolverExtension] 
= Seq(
@@ -206,8 +205,6 @@ abstract class BaseSessionStateBuilder(
     override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] =
       DetectAmbiguousSelfJoin +:
       ApplyCharTypePadding +:
-      ResolveSQLFunctions +:
-      ResolveDeserializer +:
       singlePassCustomPostHocResolutionRules
 
     override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit] 
= {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index f642393ea139..ad6bcd414966 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -212,7 +211,7 @@ class SetPathSuite extends SharedSparkSession {
         },
         condition = "DUPLICATE_SQL_PATH_ENTRY",
         sqlState = Some("42732"),
-        parameters = Map("pathEntry" -> "current_schema"))
+        parameters = Map("pathEntry" -> "spark_catalog.default"))
     }
   }
 
@@ -232,17 +231,16 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
-  test("PATH enabled: literal + CURRENT_SCHEMA collision is tolerated 
(USE-state dependent)") {
-    // SET PATH only rejects static duplicates (literal-vs-literal, 
current_schema repeated).
-    // A literal that happens to match the live current_schema is not flagged: 
a later
-    // `USE SCHEMA` may make them diverge, and at lookup the first match wins 
anyway.
-    // `system.builtin` is included so `current_path()` itself remains 
resolvable.
+  test("PATH enabled: duplicate after expanding CURRENT_SCHEMA") {
     withPathEnabled {
       sql("USE spark_catalog.default")
-      sql("SET PATH = spark_catalog.default, current_schema, system.builtin")
-      val entries = pathEntries(currentPath())
-      assert(entries === Seq("spark_catalog.default", "spark_catalog.default", 
"system.builtin"),
-        s"Expected literal + resolved CURRENT_SCHEMA preserved; got: $entries")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SET PATH = spark_catalog.default, current_schema")
+        },
+        condition = "DUPLICATE_SQL_PATH_ENTRY",
+        sqlState = Some("42732"),
+        parameters = Map("pathEntry" -> "spark_catalog.default"))
     }
   }
 
@@ -255,7 +253,7 @@ class SetPathSuite extends SharedSparkSession {
         },
         condition = "DUPLICATE_SQL_PATH_ENTRY",
         sqlState = Some("42732"),
-        parameters = Map("pathEntry" -> "current_schema"))
+        parameters = Map("pathEntry" -> "spark_catalog.default"))
     }
   }
 
@@ -570,260 +568,4 @@ class SetPathSuite extends SharedSparkSession {
       }
     }
   }
-
-  // --- spark.sql.defaultPath (SQLConf.DEFAULT_PATH) ---
-  // The conf carries the SET PATH grammar; sessionPathEntries falls back to 
it lazily
-  // when no `SET PATH` has been issued, mirroring how `currentCatalog` falls 
back to
-  // [[SQLConf.DEFAULT_CATALOG]].
-
-  test("DEFAULT_PATH conf: lazy fallback when no SET PATH issued") {
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "true",
-        SQLConf.DEFAULT_PATH.key -> "spark_catalog.default, system.builtin") {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      catalogManager.clearSessionPath()
-      try {
-        val entries = pathEntries(currentPath())
-        assert(entries == Seq("spark_catalog.default", "system.builtin"),
-          s"Expected DEFAULT_PATH conf to drive current_path(); got: $entries")
-        assert(catalogManager.storedSessionPathEntries.isEmpty,
-          "DEFAULT_PATH lookup must not write to the in-memory stored session 
path")
-      } finally {
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
-
-  test("DEFAULT_PATH conf: explicit SET PATH overrides the conf") {
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "true",
-        SQLConf.DEFAULT_PATH.key -> "system.builtin, system.session") {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      try {
-        sql("SET PATH = system.session, system.builtin")
-        val entries = pathEntries(currentPath())
-        assert(entries == Seq("system.session", "system.builtin"),
-          s"Expected SET PATH to win over DEFAULT_PATH conf; got: $entries")
-      } finally {
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
-
-  test("DEFAULT_PATH conf: SET PATH = DEFAULT_PATH expands to the conf value") 
{
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "true",
-        SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin, 
current_schema") {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      try {
-        sql("SET PATH = DEFAULT_PATH")
-        val entries = pathEntries(currentPath())
-        assert(entries.head.contains("system.session"),
-          s"DEFAULT_PATH expansion should follow conf order (session first); 
got: $entries")
-        assert(catalogManager.storedSessionPathEntries.isDefined,
-          "After SET PATH the in-memory stored session path should be 
populated")
-      } finally {
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
-
-  test("DEFAULT_PATH conf: cycle break -- inner DEFAULT_PATH falls back to 
builtin order") {
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "true",
-        SQLConf.DEFAULT_PATH.key -> "DEFAULT_PATH",
-        // Pin order conf to "first" so the spark-builtin default ordering is 
observable.
-        SQLConf.SESSION_FUNCTION_RESOLUTION_ORDER.key -> "first") {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      catalogManager.clearSessionPath()
-      try {
-        val entries = pathEntries(currentPath())
-        assert(entries.head.contains("system.session"),
-          s"Inner DEFAULT_PATH should resolve to builtin order seeded by the 
order conf " +
-            s"('first' -> session leading); got: $entries")
-      } finally {
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
-
-  test("DEFAULT_PATH conf: invalid value rejected on SET 
spark.sql.defaultPath") {
-    withPathEnabled {
-      val e = intercept[SparkIllegalArgumentException] {
-        sql("SET spark.sql.defaultPath = this is not a path")
-      }
-      assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage)
-    }
-  }
-
-  test("DEFAULT_PATH conf: PATH keyword is rejected on SET 
spark.sql.defaultPath") {
-    withPathEnabled {
-      val e = intercept[SparkIllegalArgumentException] {
-        sql("SET spark.sql.defaultPath = PATH, system.builtin")
-      }
-      assert(e.getCondition.startsWith("INVALID_CONF_VALUE"), e.getMessage)
-    }
-  }
-
-  test("DEFAULT_PATH conf: PATH disabled returns no fallback") {
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "false",
-        SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") {
-      val catalogManager = spark.sessionState.catalogManager
-      assert(catalogManager.sessionPathEntries.isEmpty,
-        "DEFAULT_PATH conf must not take effect when PATH is disabled")
-    }
-  }
-
-  // --- Path-driven security check (built on the lazy DEFAULT_PATH fallback) 
---
-  // The "block temp function shadowing builtin" check is now driven by the 
live PATH, so
-  // changes via SET PATH or DEFAULT_PATH take effect even when the legacy 
order conf is
-  // left at its default.
-
-  test("path-driven security check: SET PATH putting session before builtin 
blocks temp " +
-      "function with a builtin name") {
-    withPathEnabled {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      try {
-        // Default `sessionFunctionResolutionOrder` is "second" (builtin 
first), but SET PATH
-        // overrides that to put session first. The security check must 
reflect the live path.
-        sql("SET PATH = system.session, system.builtin")
-        val e = intercept[AnalysisException] {
-          sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1")
-        }
-        assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage)
-      } finally {
-        sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
-
-  test("path-driven security check: DEFAULT_PATH conf putting session before 
builtin " +
-      "blocks temp function with a builtin name (no SET PATH issued)") {
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "true",
-        SQLConf.DEFAULT_PATH.key -> "system.session, system.builtin") {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      catalogManager.clearSessionPath()
-      try {
-        // Order conf is left at its default ("second"). The path-driven gate 
must read
-        // DEFAULT_PATH and fire the security check for unqualified 
temp/builtin collisions.
-        val e = intercept[AnalysisException] {
-          sql("CREATE TEMPORARY FUNCTION count() RETURNS INT RETURN 1")
-        }
-        assert(e.getCondition == "ROUTINE_ALREADY_EXISTS", e.getMessage)
-      } finally {
-        sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
-
-  test("PATH enabled: SET PATH with only user schemas does not implicitly 
resolve builtins") {
-    withPathEnabled {
-      sql("CREATE SCHEMA IF NOT EXISTS only_user_on_path")
-      try {
-        sql("SET PATH = spark_catalog.only_user_on_path")
-        val e = intercept[AnalysisException] {
-          sql("SELECT abs(-1)").collect()
-        }
-        assert(e.getCondition == "UNRESOLVED_ROUTINE", e.getMessage)
-      } finally {
-        sql("DROP SCHEMA IF EXISTS only_user_on_path")
-      }
-    }
-  }
-
-  test("single-pass: count(*) rewrite respects PATH temp-before-builtin gate") 
{
-    withSQLConf(
-      SQLConf.PATH_ENABLED.key -> "true",
-      SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key -> "true") {
-      sql("SET PATH = system.builtin, system.session")
-      sql("CREATE TEMPORARY FUNCTION count(x INT) RETURNS INT RETURN x")
-      try {
-        sql("SET PATH = system.session, system.builtin")
-        checkAnswer(
-          sql("SELECT count(*) FROM VALUES (CAST(NULL AS INT)) AS t(c)"),
-          Row(null))
-      } finally {
-        sql("DROP TEMPORARY FUNCTION IF EXISTS session.count")
-      }
-    }
-  }
-
-  test("PATH enabled: explicit SET PATH with system.session AFTER a user 
catalog still " +
-      "reaches temp functions") {
-    // Explicit paths are honored as written: placing `system.session` after a 
user catalog
-    // is the user's authorization for unqualified temp functions to resolve. 
Contrast with
-    // the implicit (no SET PATH, no DEFAULT_PATH) form, which preserves the 
security property
-    // of the seeded default path.
-    withPathEnabled {
-      sql("CREATE SCHEMA IF NOT EXISTS path_interleaved_user")
-      try {
-        sql("CREATE TEMPORARY FUNCTION path_interleaved_temp() RETURNS INT 
RETURN 7")
-        try {
-          sql("SET PATH = system.builtin, spark_catalog.path_interleaved_user, 
system.session")
-          checkAnswer(sql("SELECT path_interleaved_temp()"), Row(7))
-        } finally {
-          sql("DROP TEMPORARY FUNCTION IF EXISTS path_interleaved_temp")
-        }
-      } finally {
-        sql("DROP SCHEMA IF EXISTS path_interleaved_user")
-      }
-    }
-  }
-
-  test("PATH enabled: SET PATH with user schema before system.builtin still 
resolves builtins") {
-    // Exercises systemFunctionKindsFromPath with a user-catalog entry 
preceding
-    // system.builtin: the helper flat-scans the path, so Builtin still appears
-    // in the kinds list and unqualified `abs` resolves.
-    withPathEnabled {
-      sql("CREATE SCHEMA IF NOT EXISTS path_user_before_builtin")
-      try {
-        sql("SET PATH = spark_catalog.path_user_before_builtin, 
system.builtin")
-        // `abs` is a builtin; if Builtin did not appear in the kinds list,
-        // unqualified `abs(-1)` would fail with UNRESOLVED_ROUTINE.
-        checkAnswer(sql("SELECT abs(-1)"), Row(1))
-      } finally {
-        sql("DROP SCHEMA IF EXISTS path_user_before_builtin")
-      }
-    }
-  }
-
-  test("DEFAULT_PATH conf: duplicate entries are tolerated (first-match 
resolution)") {
-    // Lookup uses first-match resolution, so redundant entries on 
DEFAULT_PATH are dead code
-    // rather than an error. (Contrast with SET PATH, which still rejects 
static duplicates as
-    // a user-input typo guard.) This avoids a UX cliff where a USE SCHEMA 
could later wedge
-    // every unqualified function lookup with DUPLICATE_SQL_PATH_ENTRY.
-    withSQLConf(
-        SQLConf.PATH_ENABLED.key -> "true",
-        SQLConf.DEFAULT_PATH.key -> "system.builtin, system.builtin") {
-      val catalogManager = spark.sessionState.catalogManager
-      val priorSessionPath = catalogManager.storedSessionPathEntries
-      catalogManager.clearSessionPath()
-      try {
-        val entries = pathEntries(currentPath())
-        assert(entries == Seq("system.builtin", "system.builtin"),
-          s"DEFAULT_PATH duplicates should pass through to current_path(); 
got: $entries")
-        // Sanity: unqualified resolution still works (the second 
`system.builtin` is dead).
-        checkAnswer(sql("SELECT abs(-1)"), Row(1))
-      } finally {
-        catalogManager.clearSessionPath()
-        priorSessionPath.foreach(catalogManager.setSessionPath)
-      }
-    }
-  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 354473582678..0fbc41492e00 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -100,8 +100,6 @@ class HiveSessionStateBuilder(
     override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] =
       DetectAmbiguousSelfJoin +:
       ApplyCharTypePadding +:
-      ResolveSQLFunctions +:
-      ResolveDeserializer +:
       singlePassCustomPostHocResolutionRules
 
     override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit] 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to