This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 15777cbc47b0d2883f7304bab131e4e5257daf25
Author: Serge Rielau <[email protected]>
AuthorDate: Fri May 15 10:42:43 2026 -0700

    [SPARK-56853] Improve PATH Tests
    
    ### What changes were proposed in this pull request?
    
    This is a test-only PR that closes coverage gaps identified by a follow-up 
QA audit of the SQL Standard PATH feature delivered in SPARK-56489, 
SPARK-56501, SPARK-56518, SPARK-56520, SPARK-56605, SPARK-56639, SPARK-56681, 
and SPARK-56750. No product code is changed.
    
    New test surfaces:
    
    - **Spark Connect E2E** — `sql/connect/.../SqlPathE2ETestSuite.scala` 
(new): `SET PATH` and `current_path()` round-trip over the gRPC client; a 
persisted view created under one path resolves its body under the frozen path 
even when the invoker switches PATH; `SET PATH` is rejected with 
`UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED` over Connect when the feature flag 
is off. Previously, `ProtoToParsedPlanTestSuite` pinned `PATH_ENABLED=false` 
for analyzer isolation and nothing exercised  [...]
    - **PySpark API** — `python/pyspark/sql/tests/test_catalog.py`: three new 
methods added to `CatalogTestsMixin` (`test_path_current_path_disabled`, 
`test_path_set_path_and_current_path`, 
`test_path_set_path_rejected_when_disabled`). Because the mixin is shared with 
`test_parity_catalog.py`, the same tests run under classic PySpark and under 
Connect parity. Previously `pyspark.sql.functions.current_path` had no test 
calling it, and `SET PATH` was never exercised from Python.
    - **Golden SQL** — `sql-tests/inputs/sql-path.sql` + matching `results/` 
and `analyzer-results/` outputs. The file has a TOC and ten labeled sections: 
default path observability; SET PATH grammar (literal, DEFAULT_PATH, 
SYSTEM_PATH, PATH append, current_schema/current_database, backticks/case); 
CURRENT_PATH() and the ANSI no-parens form; the full set of static error 
conditions; routine resolution via PATH for scalar AND table functions; 
relation resolution via PATH; persisted view fro [...]
    - **`COUNT(*) → COUNT(1)` rewrite gate** — three tests in `SetPathSuite` 
exercise the path-driven gate: (1) the fixed-point analyzer 
(`Analyzer.matchesFunctionName` / 
`FunctionResolution.isSessionBeforeBuiltinInPath`); (2) the gate fires ONLY 
when a temp count exists (an unrelated temp must not affect the rewrite); (3) 
the single-pass-resolver counterpart 
(`FunctionResolverUtils.isUnqualifiedCountShadowedByTemp` via 
`isNonDistinctCount` / `handleStarInArguments`), gated under `spark.s [...]
    - **`ALTER VIEW ... WITH SCHEMA` preserves the frozen path** — new test in 
`v1.AlterViewSchemaBindingSuite` asserts that 
`generateViewProperties(captureNewPath=false)` keeps the persisted 
`VIEW_RESOLUTION_PATH` intact even when the caller's session PATH differs from 
the create-time path.
    - **Catalyst unit tests** — new `SqlPathFormatSuite` (`toDescribeJson` 
valid / malformed payloads, `formatForDisplay` quoting behavior, multi-level 
namespaces). `CatalogManagerSuite` is extended with direct 
`PathElement.validateNoStaticDuplicates` cases (case sensitivity, repeated 
`CurrentSchemaEntry`, literal-vs-`CurrentSchemaEntry` toleration, multi-part 
and dot-containing identifier error formatting) plus `serializePathEntries` 
round-tripping (incl. multi-level / quoted / space-con [...]
    - **PATH-disabled compat for persisted views** — two new tests in 
`SQLViewSuite` pin the documented behavior of 
`CatalogManager.resolutionPathEntriesForAnalysis` when 
`spark.sql.path.enabled=false`: the pinned frozen path is dropped and analysis 
falls back to the recorded `viewCatalogAndNamespace`. The fully-qualified body 
path keeps working; the unqualified body resolves through the recorded 
namespace (or raises `TABLE_OR_VIEW_NOT_FOUND` when nothing in scope matches).
    - **Concurrency smoke test** — new `SetPathSuite` case runs `SET PATH` 
(alternating between two paths) and unqualified `count(*)` lookups concurrently 
for 200 iterations each, validating the in-source claim that 
`SessionCatalog.lookupBuiltinOrTempFunction` is intentionally non-synchronized 
to avoid lock-order inversion with `CatalogManager.synchronized`.
    - **`cloneSession()` propagation matrix** — replaces the in-source `TODO` 
in `SetPathSuite` ("audit and pin down clone semantics in a follow-up") with 
six tests pinning what does and does not survive `spark.cloneSession()`. 
Confirmed: stored `SET PATH`, `USE SCHEMA`, temp views, and temp functions 
(because `functionRegistry.clone()` deep-copies them in 
`BaseSessionStateBuilder`) propagate; temp variables do not; a child's `SET 
PATH` does not leak back to the parent.
    - **V2 catalogs in `SET PATH`** — new `SqlPathV2CatalogSuite` registers two 
`InMemoryCatalog` instances and verifies first-match resolution for unqualified 
tables (id=10 vs id=20 distinguishes which catalog supplied the row) and for 
unqualified V2 functions (a tiny `StrLenTimes100` `ScalarFunction[Int]` 
returning `s.length * 100` is registered on `pathcat2`, so `strlen('abc')` 
returns 3 or 300 depending on path order), plus the negative case where the 
unqualified name only lives in a  [...]
    
    Explicitly deferred:
    
    - DESCRIBE FUNCTION rendering of the frozen path 
(`DescribeFunctionCommandUtils.storedResolutionPathString`). DESCRIBE FUNCTION 
needs broader improvements first; will land in a follow-up.
    
    ### Why are the changes needed?
    
    A QA audit of the PATH feature found several substantive gaps:
    
    - Spark Connect serialization of `AnalysisContext.resolutionPathEntries` 
was entirely untested; a regression dropping pinned entries from the proto plan 
would not have been caught.
    - The public `current_path()` PySpark function was documented (with a 
`+SKIP` doctest) but never called from any test.
    - The PATH feature had no `SQLQueryTestSuite` golden file, despite golden 
coverage being the project convention for SQL-level features of similar surface 
(CURRENT_SCHEMA, variables, identifier clauses).
    - The path-driven gate for `COUNT(*) → COUNT(1)` was a behavior change 
called out in the SPARK-56750 description but had no test that flipped PATH and 
verified the suppression -- in either the fixed-point or the single-pass 
resolver.
    - `ALTER VIEW ... WITH SCHEMA` preserving the frozen path was undocumented 
in tests.
    - `PathElement`, `SqlPathFormat`, and `CatalogManager.serializePathEntries` 
were only exercised end-to-end via SQL.
    - The PATH-disabled read path through a view that already persisted a 
frozen path was untested (a forward-compat scenario likely to occur during 
rolling upgrades).
    - The deadlock-safety claim in `SessionCatalog` about non-synchronized 
lookups was unvalidated.
    - The `cloneSession()` propagation matrix was flagged as "incidental" with 
an in-source TODO.
    - Real V2 catalogs in `SET PATH` were not exercised end-to-end (only stub 
`cat`/`cat2` for procedures).
    
    These tests pin the documented intent so future changes have to update the 
assertions deliberately.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Test-only changes.
    
    ### How was this patch tested?
    
    The new tests are themselves the patch. They all pass locally:
    
    - `build/sbt "connect-client-jvm/testOnly *SqlPathE2ETestSuite"`
    - `build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite -- -z 
sql-path.sql"` (and regenerated via `SPARK_GENERATE_GOLDEN_FILES=1`)
    - `build/sbt "sql/testOnly org.apache.spark.sql.SetPathSuite"`
    - `build/sbt "sql/testOnly 
org.apache.spark.sql.execution.command.v1.AlterViewSchemaBindingSuite"`
    - `build/sbt "sql/testOnly *SimpleSQLViewSuite -- -z SPARK-56853"`
    - `build/sbt "sql/testOnly *SqlPathV2CatalogSuite"`
    - `build/sbt "catalyst/testOnly *SqlPathFormatSuite *CatalogManagerSuite"`
    - `./dev/lint-scala` passes (Scalastyle and Scalafmt).
    
    PySpark tests (`test_path_*` in `CatalogTestsMixin` and the Connect parity 
variant) are syntactically validated; they are picked up automatically by the 
existing `pyspark.sql.tests.test_catalog` and 
`pyspark.sql.tests.connect.test_parity_catalog` modules registered in 
`dev/sparktestsupport/modules.py` and will run in CI.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. Test design and implementation were iterated with coding-assistant 
tooling; the author reviewed and owns the final patch.
    
    Generated-by: Cursor with Claude Opus 4.7
    
    Closes #55866 from srielau/SPARK-56853-path-qa-gaps.
    
    Authored-by: Serge Rielau <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 python/pyspark/sql/tests/test_catalog.py           |   41 +
 .../sql/catalyst/catalog/SqlPathFormatSuite.scala  |   98 ++
 .../connector/catalog/CatalogManagerSuite.scala    |  111 ++
 .../spark/sql/connect/SqlPathE2ETestSuite.scala    |   97 ++
 .../sql-tests/analyzer-results/sql-path.sql.out    | 1041 +++++++++++++++++
 .../test/resources/sql-tests/inputs/sql-path.sql   |  410 +++++++
 .../resources/sql-tests/results/sql-path.sql.out   | 1202 ++++++++++++++++++++
 .../scala/org/apache/spark/sql/SetPathSuite.scala  |  273 ++++-
 .../sql/connector/SqlPathV2CatalogSuite.scala      |  157 +++
 .../apache/spark/sql/execution/SQLViewSuite.scala  |  100 ++
 .../command/v1/AlterViewSchemaBindingSuite.scala   |   48 +-
 11 files changed, 3572 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/tests/test_catalog.py 
b/python/pyspark/sql/tests/test_catalog.py
index 92ffea233215..d832a9ffa7d0 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -588,6 +588,47 @@ class CatalogTestsMixin:
             spark.sql(f"INSERT INTO {t} VALUES (1)")
             spark.catalog.analyzeTable(t, noScan=True)
 
+    def test_path_current_path_disabled(self):
+        # current_path() is a regular builtin and resolves even when
+        # spark.sql.path.enabled is false. The DataFrame and SQL surfaces must 
agree.
+        from pyspark.sql.functions import current_path
+
+        spark = self.spark
+        with self.sql_conf({"spark.sql.path.enabled": False}):
+            sql_form = spark.sql("SELECT current_path()").collect()[0][0]
+            self.assertIsInstance(sql_form, str)
+            self.assertNotEqual(sql_form, "")
+            api_form = spark.range(1).select(current_path()).collect()[0][0]
+            self.assertEqual(sql_form, api_form)
+
+    def test_path_set_path_and_current_path(self):
+        # SET PATH is parsed and applied; current_path() reflects it
+        # over both the SQL and DataFrame surfaces. Restores DEFAULT_PATH on 
exit.
+        from pyspark.sql.functions import current_path
+
+        spark = self.spark
+        with self.sql_conf({"spark.sql.path.enabled": True}):
+            try:
+                spark.sql("SET PATH = spark_catalog.default, system.builtin")
+                sql_form = spark.sql("SELECT current_path()").collect()[0][0]
+                self.assertEqual(sql_form, 
"spark_catalog.default,system.builtin")
+                api_form = 
spark.range(1).select(current_path()).collect()[0][0]
+                self.assertEqual(sql_form, api_form)
+            finally:
+                spark.sql("SET PATH = DEFAULT_PATH")
+
+    def test_path_set_path_rejected_when_disabled(self):
+        # SET PATH must raise UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED
+        # when the feature flag is off (covers both classic and Connect error 
paths).
+        spark = self.spark
+        with self.sql_conf({"spark.sql.path.enabled": False}):
+            with self.assertRaises(AnalysisException) as ctx:
+                spark.sql("SET PATH = spark_catalog.default")
+            self.assertEqual(
+                ctx.exception.getCondition(),
+                "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED",
+            )
+
 
 class CatalogTests(CatalogTestsMixin, ReusedSQLTestCase):
     pass
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SqlPathFormatSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SqlPathFormatSuite.scala
new file mode 100644
index 000000000000..0ed3bcfb1963
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SqlPathFormatSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.json4s.JsonAST.{JArray, JObject, JString}
+import org.json4s.jackson.JsonMethods.{compact, render}
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit tests for [[SqlPathFormat]] -- the helper that converts the raw 
JSON-array-of-arrays
+ * path stored on view / SQL function metadata into the JSON-object form used 
by DESCRIBE
+ * AS JSON and the human-readable form used by DESCRIBE EXTENDED.
+ */
+class SqlPathFormatSuite extends SparkFunSuite {
+
+  private def compactJson(v: JArray): String = compact(render(v))
+
+  test("toDescribeJson: maps each [catalog, ns...] entry to a JSON object") {
+    val stored =
+      """[["spark_catalog","default"],["system","builtin"]]"""
+    val result = SqlPathFormat.toDescribeJson(stored)
+      .getOrElse(fail(s"Expected a JSON value, got None for: $stored"))
+    val expected = JArray(List(
+      JObject("catalog_name" -> JString("spark_catalog"),
+        "namespace" -> JArray(List(JString("default")))),
+      JObject("catalog_name" -> JString("system"),
+        "namespace" -> JArray(List(JString("builtin"))))))
+    assert(compactJson(result.asInstanceOf[JArray]) == compactJson(expected))
+  }
+
+  test("toDescribeJson: multi-level namespace becomes [head, tail...]") {
+    val stored = """[["cat1","db","sub"]]"""
+    val result = SqlPathFormat.toDescribeJson(stored)
+      .getOrElse(fail("Expected a JSON value"))
+    val expected = JArray(List(
+      JObject("catalog_name" -> JString("cat1"),
+        "namespace" -> JArray(List(JString("db"), JString("sub"))))))
+    assert(compactJson(result.asInstanceOf[JArray]) == compactJson(expected))
+  }
+
+  test("toDescribeJson: empty array returns None") {
+    assert(SqlPathFormat.toDescribeJson("[]").isEmpty)
+  }
+
+  test("toDescribeJson: malformed payloads return None") {
+    Seq(
+      "",
+      "not_json",
+      "{}",
+      """{"foo":1}""",
+      """[1, 2, 3]"""
+    ).foreach { payload =>
+      assert(SqlPathFormat.toDescribeJson(payload).isEmpty, 
s"payload=$payload")
+    }
+  }
+
+  test("formatForDisplay: renders plain identifiers without backticks") {
+    val json = SqlPathFormat.toDescribeJson(
+      """[["spark_catalog","default"],["system","builtin"]]""")
+      .getOrElse(fail("Expected a JSON value"))
+    val rendered = SqlPathFormat.formatForDisplay(json)
+      .getOrElse(fail("Expected a display string"))
+    assert(rendered == "spark_catalog.default, system.builtin")
+  }
+
+  test("formatForDisplay: backticks identifiers that need quoting") {
+    val json = SqlPathFormat.toDescribeJson(
+      """[["spark_catalog","weird.schema"]]""")
+      .getOrElse(fail("Expected a JSON value"))
+    val rendered = SqlPathFormat.formatForDisplay(json)
+      .getOrElse(fail("Expected a display string"))
+    assert(rendered == "spark_catalog.`weird.schema`")
+  }
+
+  test("formatForDisplay: round-trips multi-level namespaces") {
+    val json = SqlPathFormat.toDescribeJson("""[["cat","db","ns"]]""")
+      .getOrElse(fail("Expected a JSON value"))
+    val rendered = SqlPathFormat.formatForDisplay(json)
+      .getOrElse(fail("Expected a display string"))
+    assert(rendered == "cat.db.ns")
+  }
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
index acf86aae1eea..64b2ac91fbd6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
@@ -22,9 +22,11 @@ import java.net.URI
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, 
FakeV2SessionCatalog, NoSuchNamespaceException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog 
=> V1InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
+import 
org.apache.spark.sql.connector.catalog.CatalogManager.{CurrentSchemaEntry, 
LiteralPathEntry}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -150,6 +152,115 @@ class CatalogManagerSuite extends SparkFunSuite with 
SQLHelper {
       assert(CatalogManager.deserializePathEntries(payload).isEmpty, 
s"payload=$payload")
     }
   }
+
+  test("serializePathEntries round-trips through deserialize for typical 
inputs") {
+    val cases = Seq(
+      Seq(Seq("spark_catalog", "default"), Seq("system", "builtin")),
+      Seq(Seq("system", "session")),
+      Seq.empty[Seq[String]])
+    cases.foreach { entries =>
+      val payload = CatalogManager.serializePathEntries(entries)
+      val parsed = CatalogManager.deserializePathEntries(payload)
+        .getOrElse(fail(s"Expected payload to round-trip: $payload"))
+      assert(parsed === entries, s"Round-trip mismatch for $entries; got 
$parsed")
+    }
+  }
+
+  test("serializePathEntries round-trips multi-level and quoted identifiers") {
+    val entries = Seq(
+      Seq("cat", "ns1", "ns2"),
+      Seq("spark_catalog", "sch.with.dots"),
+      Seq("spark_catalog", "schema with spaces"))
+    val payload = CatalogManager.serializePathEntries(entries)
+    val parsed = CatalogManager.deserializePathEntries(payload)
+      .getOrElse(fail(s"Expected payload to round-trip: $payload"))
+    assert(parsed === entries)
+  }
+
+  test("deserializePathEntriesOrFail raises a clear AnalysisException for bad 
payloads") {
+    val e = intercept[AnalysisException] {
+      CatalogManager.deserializePathEntriesOrFail(
+        storedPathStr = "{bad-json",
+        objectType = "view",
+        objectName = "default.v_broken")
+    }
+    assert(e.getMessage.contains("Invalid stored SQL path metadata for view"))
+    assert(e.getMessage.contains("default.v_broken"))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Direct unit tests for [[PathElement.validateNoStaticDuplicates]]. The 
end-to-end
+  // `SetPathSuite` exercises this via SQL, but the duplicate-detection rules
+  // (literal-vs-literal, current_schema-vs-current_schema, case-sensitivity) 
are pure
+  // data and benefit from focused tests close to the implementation.
+  // 
---------------------------------------------------------------------------
+
+  private def literalEntry(parts: String*): LiteralPathEntry = 
LiteralPathEntry(parts.toSeq)
+
+  test("validateNoStaticDuplicates: no duplicates returns the input 
unchanged") {
+    val entries = Seq(
+      literalEntry("spark_catalog", "default"),
+      literalEntry("system", "builtin"),
+      CurrentSchemaEntry)
+    assert(PathElement.validateNoStaticDuplicates(entries, caseSensitive = 
false) === entries)
+  }
+
+  test("validateNoStaticDuplicates: duplicate literal under case-insensitive 
collation") {
+    val entries = Seq(
+      literalEntry("spark_catalog", "default"),
+      literalEntry("Spark_Catalog", "DEFAULT"))
+    val e = intercept[AnalysisException] {
+      PathElement.validateNoStaticDuplicates(entries, caseSensitive = false)
+    }
+    assert(e.getCondition == "DUPLICATE_SQL_PATH_ENTRY")
+    assert(e.getMessageParameters.get("pathEntry") == "Spark_Catalog.DEFAULT")
+  }
+
+  test("validateNoStaticDuplicates: case-sensitive mode keeps differently 
cased entries") {
+    val entries = Seq(
+      literalEntry("spark_catalog", "DEFAULT"),
+      literalEntry("spark_catalog", "default"))
+    assert(PathElement.validateNoStaticDuplicates(entries, caseSensitive = 
true) === entries)
+  }
+
+  test("validateNoStaticDuplicates: repeated CurrentSchemaEntry is rejected") {
+    val entries = Seq(CurrentSchemaEntry, CurrentSchemaEntry)
+    val e = intercept[AnalysisException] {
+      PathElement.validateNoStaticDuplicates(entries, caseSensitive = false)
+    }
+    assert(e.getCondition == "DUPLICATE_SQL_PATH_ENTRY")
+    assert(e.getMessageParameters.get("pathEntry") == "current_schema")
+  }
+
+  test("validateNoStaticDuplicates: literal-vs-CurrentSchemaEntry collision is 
tolerated") {
+    // The CurrentSchemaEntry marker resolves dynamically against USE SCHEMA, 
so a literal
+    // that happens to match the live current schema is intentionally not 
flagged here.
+    val entries = Seq(
+      literalEntry("spark_catalog", "default"),
+      CurrentSchemaEntry,
+      literalEntry("system", "builtin"))
+    assert(PathElement.validateNoStaticDuplicates(entries, caseSensitive = 
false) === entries)
+  }
+
+  test("validateNoStaticDuplicates: identifier containing a dot is quoted in 
the error") {
+    val entries = Seq(
+      literalEntry("spark_catalog", "weird.schema"),
+      literalEntry("spark_catalog", "weird.schema"))
+    val e = intercept[AnalysisException] {
+      PathElement.validateNoStaticDuplicates(entries, caseSensitive = false)
+    }
+    assert(e.getMessageParameters.get("pathEntry") == 
"spark_catalog.`weird.schema`")
+  }
+
+  test("validateNoStaticDuplicates: multi-level namespace duplicate is 
flagged") {
+    val entries = Seq(
+      literalEntry("cat", "db", "ns"),
+      literalEntry("cat", "db", "ns"))
+    val e = intercept[AnalysisException] {
+      PathElement.validateNoStaticDuplicates(entries, caseSensitive = false)
+    }
+    assert(e.getMessageParameters.get("pathEntry") == "cat.db.ns")
+  }
 }
 
 class DummyCatalog extends CatalogPlugin {
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SqlPathE2ETestSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SqlPathE2ETestSuite.scala
new file mode 100644
index 000000000000..88ed1f31c86a
--- /dev/null
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SqlPathE2ETestSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, 
SQLHelper}
+import org.apache.spark.sql.functions.current_path
+
+/**
+ * End-to-end coverage for the SQL Standard PATH feature over Spark Connect.
+ *
+ * SET PATH and the frozen-path semantics for persisted views / SQL functions 
are implemented
+ * entirely server-side, but the analyzer state (`AnalysisContext`) that 
carries the pinned path
+ * must survive plan reification across the gRPC boundary. These tests run the 
public surface over
+ * a real Connect client so regressions there are caught:
+ *   - `SET PATH = ...` is parsed and applied to the session,
+ *   - `current_path()` (SQL and the DataFrame builtin) reflects it,
+ *   - a persisted view created under one path resolves its body under the 
frozen path even when
+ *     the invoker switches the session path.
+ */
+class SqlPathE2ETestSuite extends ConnectFunSuite with RemoteSparkSession with 
SQLHelper {
+
+  test("SET PATH and current_path() round-trip over Connect") {
+    withSQLConf("spark.sql.path.enabled" -> "true") {
+      try {
+        spark.sql("SET PATH = spark_catalog.default, system.builtin")
+        val sqlPath = spark.sql("SELECT current_path()").head().getString(0)
+        assert(
+          sqlPath == "spark_catalog.default,system.builtin",
+          s"current_path() over Connect should reflect SET PATH; got: 
$sqlPath")
+
+        // DataFrame builtin should agree with the SQL form.
+        val apiPath = spark.range(1).select(current_path()).head().getString(0)
+        assert(
+          apiPath == sqlPath,
+          s"functions.current_path() should match SQL current_path(); got: 
$apiPath vs $sqlPath")
+      } finally {
+        spark.sql("SET PATH = DEFAULT_PATH")
+      }
+    }
+  }
+
+  test("Persisted view body uses frozen path over Connect") {
+    withSQLConf("spark.sql.path.enabled" -> "true") {
+      withDatabase("connect_path_a", "connect_path_b") {
+        spark.sql("CREATE DATABASE connect_path_a")
+        spark.sql("CREATE DATABASE connect_path_b")
+        spark.sql("CREATE TABLE connect_path_a.frozen_t USING parquet AS 
SELECT 1 AS id")
+        spark.sql("CREATE TABLE connect_path_b.frozen_t USING parquet AS 
SELECT 2 AS id")
+        withView("default.v_path_connect") {
+          try {
+            // Create the view under PATH=a.
+            spark.sql("SET PATH = spark_catalog.connect_path_a, 
system.builtin")
+            spark.sql("CREATE VIEW default.v_path_connect AS SELECT id FROM 
frozen_t")
+
+            // Switch the session path to b; bare `frozen_t` now resolves 
through b,
+            // but the view's frozen path keeps it pinned to a.
+            spark.sql("SET PATH = spark_catalog.connect_path_b, 
system.builtin")
+            val bare = spark.sql("SELECT id FROM frozen_t").head().getInt(0)
+            assert(bare == 2, s"Bare `frozen_t` should follow live PATH=b; 
got: $bare")
+            val viaView = spark.sql("SELECT id FROM 
default.v_path_connect").head().getInt(0)
+            assert(
+              viaView == 1,
+              s"View body should resolve via the frozen creation-time PATH; 
got: $viaView")
+          } finally {
+            spark.sql("SET PATH = DEFAULT_PATH")
+          }
+        }
+      }
+    }
+  }
+
+  test("SET PATH is rejected over Connect when feature is disabled") {
+    withSQLConf("spark.sql.path.enabled" -> "false") {
+      val ex = intercept[AnalysisException] {
+        spark.sql("SET PATH = spark_catalog.default")
+      }
+      assert(
+        ex.getCondition == "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED",
+        s"Expected SET_PATH_WHEN_DISABLED, got: ${ex.getCondition}")
+    }
+  }
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-path.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-path.sql.out
new file mode 100644
index 000000000000..3a494d1cd3b7
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-path.sql.out
@@ -0,0 +1,1041 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, default)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = Spark_Catalog.Default, System.Builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(Spark_Catalog, Default)), 
SchemaInPath(List(System, Builtin))]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.`sch.b`, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sch.b)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = SYSTEM_PATH
+-- !query analysis
+SetPathCommand [SystemPath]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, default)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SET PATH = PATH, system.session
+-- !query analysis
+SetPathCommand [PathRef, SchemaInPath(List(system, session))]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+USE spark_catalog.default
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [default]
+
+
+-- !query
+SET PATH = current_schema, system.builtin
+-- !query analysis
+SetPathCommand [CurrentSchema, SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = current_database, system.builtin
+-- !query analysis
+SetPathCommand [CurrentSchema, SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT current_path()
+-- !query analysis
+Project [current_path() AS current_path()#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, default)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT CURRENT_PATH = current_path() AS ansi_form_matches
+-- !query analysis
+Project [(current_path() = current_path()) AS ansi_form_matches#x]
++- OneRowRelation
+
+
+-- !query
+SELECT current_path(1)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "1",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "0",
+    "functionName" : "`current_path`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "current_path(1)"
+  } ]
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SET PATH = spark_catalog.default, spark_catalog.default
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "spark_catalog.default"
+  }
+}
+
+
+-- !query
+SET PATH = spark_catalog.DEFAULT, spark_catalog.default
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "spark_catalog.default"
+  }
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH, system.builtin
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "system.builtin"
+  }
+}
+
+
+-- !query
+SET PATH = SYSTEM_PATH, SYSTEM_PATH
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "system.builtin"
+  }
+}
+
+
+-- !query
+SET PATH = current_database, current_schema
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "current_schema"
+  }
+}
+
+
+-- !query
+SET PATH = my_schema_no_catalog
+-- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "INVALID_SQL_PATH_SCHEMA_REFERENCE",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "qualifiedName" : "my_schema_no_catalog"
+  }
+}
+
+
+-- !query
+CREATE SCHEMA sql_path_routines
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_routines]
+
+
+-- !query
+CREATE FUNCTION sql_path_routines.pick() RETURNS INT RETURN 7
+-- !query analysis
+CreateSQLFunctionCommand spark_catalog.sql_path_routines.pick, INT, 7, false, 
false, false, false
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.default, 
system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_routines)), 
SchemaInPath(List(spark_catalog, default)), SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT pick()
+-- !query analysis
+Project [spark_catalog.sql_path_routines.pick() AS 
spark_catalog.sql_path_routines.pick()#x]
++- Project
+   +- OneRowRelation
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+CREATE FUNCTION sql_path_routines.pick_tvf()
+RETURNS TABLE(val INT)
+RETURN SELECT 7 AS val
+-- !query analysis
+CreateSQLFunctionCommand spark_catalog.sql_path_routines.pick_tvf, val INT, 
SELECT 7 AS val, true, false, false, false
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.default, 
system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_routines)), 
SchemaInPath(List(spark_catalog, default)), SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT * FROM pick_tvf()
+-- !query analysis
+Project [val#x]
++- SQLFunctionNode spark_catalog.sql_path_routines.pick_tvf
+   +- SubqueryAlias pick_tvf
+      +- Project [cast(val#x as int) AS val#x]
+         +- Project [7 AS val#x]
+            +- OneRowRelation
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+CREATE SCHEMA sql_path_routines_b
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_routines_b]
+
+
+-- !query
+CREATE FUNCTION sql_path_routines_b.pick() RETURNS INT RETURN 11
+-- !query analysis
+CreateSQLFunctionCommand spark_catalog.sql_path_routines_b.pick, INT, 11, 
false, false, false, false
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.sql_path_routines_b, 
system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_routines)), 
SchemaInPath(List(spark_catalog, sql_path_routines_b)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT pick() AS from_first_schema
+-- !query analysis
+Project [spark_catalog.sql_path_routines.pick() AS from_first_schema#x]
++- Project
+   +- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines_b, spark_catalog.sql_path_routines, 
system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_routines_b)), 
SchemaInPath(List(spark_catalog, sql_path_routines)), SchemaInPath(List(system, 
builtin))]
+
+
+-- !query
+SELECT pick() AS from_first_schema
+-- !query analysis
+Project [spark_catalog.sql_path_routines_b.pick() AS from_first_schema#x]
++- Project
+   +- OneRowRelation
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, default)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT pick()
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVED_ROUTINE",
+  "sqlState" : "42883",
+  "messageParameters" : {
+    "routineName" : "`pick`",
+    "searchPath" : "[`spark_catalog`.`default`, `system`.`builtin`]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 13,
+    "fragment" : "pick()"
+  } ]
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+DROP FUNCTION sql_path_routines.pick
+-- !query analysis
+DropFunctionCommand spark_catalog.sql_path_routines.pick, false, false
+
+
+-- !query
+DROP FUNCTION sql_path_routines.pick_tvf
+-- !query analysis
+DropFunctionCommand spark_catalog.sql_path_routines.pick_tvf, false, false
+
+
+-- !query
+DROP FUNCTION sql_path_routines_b.pick
+-- !query analysis
+DropFunctionCommand spark_catalog.sql_path_routines_b.pick, false, false
+
+
+-- !query
+DROP SCHEMA sql_path_routines
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_routines]
+
+
+-- !query
+DROP SCHEMA sql_path_routines_b
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_routines_b]
+
+
+-- !query
+CREATE SCHEMA sql_path_relations_a
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_relations_a]
+
+
+-- !query
+CREATE SCHEMA sql_path_relations_b
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_relations_b]
+
+
+-- !query
+CREATE TABLE sql_path_relations_a.tbl USING parquet AS SELECT 1 AS id
+-- !query analysis
+CreateDataSourceTableAsSelectCommand 
`spark_catalog`.`sql_path_relations_a`.`tbl`, ErrorIfExists, [id]
+   +- Project [1 AS id#x]
+      +- OneRowRelation
+
+
+-- !query
+CREATE TABLE sql_path_relations_b.tbl USING parquet AS SELECT 2 AS id
+-- !query analysis
+CreateDataSourceTableAsSelectCommand 
`spark_catalog`.`sql_path_relations_b`.`tbl`, ErrorIfExists, [id]
+   +- Project [2 AS id#x]
+      +- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_relations_a, 
spark_catalog.sql_path_relations_b, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_relations_a)), 
SchemaInPath(List(spark_catalog, sql_path_relations_b)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT id FROM tbl AS from_first_schema
+-- !query analysis
+Project [id#x]
++- SubqueryAlias from_first_schema
+   +- SubqueryAlias spark_catalog.sql_path_relations_a.tbl
+      +- Relation spark_catalog.sql_path_relations_a.tbl[id#x] parquet
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_relations_b, 
spark_catalog.sql_path_relations_a, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_relations_b)), 
SchemaInPath(List(spark_catalog, sql_path_relations_a)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT id FROM tbl AS from_first_schema
+-- !query analysis
+Project [id#x]
++- SubqueryAlias from_first_schema
+   +- SubqueryAlias spark_catalog.sql_path_relations_b.tbl
+      +- Relation spark_catalog.sql_path_relations_b.tbl[id#x] parquet
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, default)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT id FROM tbl
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`tbl`",
+    "searchPath" : "[`spark_catalog`.`default`, `system`.`builtin`]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 16,
+    "stopIndex" : 18,
+    "fragment" : "tbl"
+  } ]
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+DROP TABLE sql_path_relations_a.tbl
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), sql_path_relations_a.tbl
+
+
+-- !query
+DROP TABLE sql_path_relations_b.tbl
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), sql_path_relations_b.tbl
+
+
+-- !query
+DROP SCHEMA sql_path_relations_a
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_relations_a]
+
+
+-- !query
+DROP SCHEMA sql_path_relations_b
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_relations_b]
+
+
+-- !query
+CREATE SCHEMA sql_path_views_a
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_views_a]
+
+
+-- !query
+CREATE SCHEMA sql_path_views_b
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_views_b]
+
+
+-- !query
+CREATE TABLE sql_path_views_a.frozen_t USING parquet AS SELECT 1 AS id
+-- !query analysis
+CreateDataSourceTableAsSelectCommand 
`spark_catalog`.`sql_path_views_a`.`frozen_t`, ErrorIfExists, [id]
+   +- Project [1 AS id#x]
+      +- OneRowRelation
+
+
+-- !query
+CREATE TABLE sql_path_views_b.frozen_t USING parquet AS SELECT 2 AS id
+-- !query analysis
+CreateDataSourceTableAsSelectCommand 
`spark_catalog`.`sql_path_views_b`.`frozen_t`, ErrorIfExists, [id]
+   +- Project [2 AS id#x]
+      +- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_views_a, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_views_a)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+CREATE VIEW default.v_path_frozen AS SELECT id FROM frozen_t
+-- !query analysis
+CreateViewCommand `spark_catalog`.`default`.`v_path_frozen`, SELECT id FROM 
frozen_t, false, false, PersistedView, COMPENSATION, true
+   +- Project [id#x]
+      +- SubqueryAlias spark_catalog.sql_path_views_a.frozen_t
+         +- Relation spark_catalog.sql_path_views_a.frozen_t[id#x] parquet
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_views_b, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_views_b)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT id FROM frozen_t AS bare_lookup_uses_live_path
+-- !query analysis
+Project [id#x]
++- SubqueryAlias bare_lookup_uses_live_path
+   +- SubqueryAlias spark_catalog.sql_path_views_b.frozen_t
+      +- Relation spark_catalog.sql_path_views_b.frozen_t[id#x] parquet
+
+
+-- !query
+SELECT id FROM default.v_path_frozen AS view_body_uses_frozen_path
+-- !query analysis
+Project [id#x]
++- SubqueryAlias view_body_uses_frozen_path
+   +- SubqueryAlias spark_catalog.default.v_path_frozen
+      +- View (`spark_catalog`.`default`.`v_path_frozen`, [id#x])
+         +- Project [cast(id#x as int) AS id#x]
+            +- Project [id#x]
+               +- SubqueryAlias spark_catalog.sql_path_views_a.frozen_t
+                  +- Relation spark_catalog.sql_path_views_a.frozen_t[id#x] 
parquet
+
+
+-- !query
+USE spark_catalog.sql_path_views_a
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_views_a]
+
+
+-- !query
+CREATE VIEW sql_path_views_a.v_ctx AS
+SELECT current_schema() AS cs, current_path() AS cp
+-- !query analysis
+CreateViewCommand `spark_catalog`.`sql_path_views_a`.`v_ctx`, SELECT 
current_schema() AS cs, current_path() AS cp, false, false, PersistedView, 
COMPENSATION, true
+   +- Project [current_schema() AS cs#x, current_path() AS cp#x]
+      +- OneRowRelation
+
+
+-- !query
+USE spark_catalog.sql_path_views_b
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_views_b]
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SELECT cs, cp FROM sql_path_views_a.v_ctx
+-- !query analysis
+Project [cs#x, cp#x]
++- SubqueryAlias spark_catalog.sql_path_views_a.v_ctx
+   +- View (`spark_catalog`.`sql_path_views_a`.`v_ctx`, [cs#x, cp#x])
+      +- Project [cast(cs#x as string) AS cs#x, cast(cp#x as string) AS cp#x]
+         +- Project [current_schema() AS cs#x, current_path() AS cp#x]
+            +- OneRowRelation
+
+
+-- !query
+USE spark_catalog.default
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [default]
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+DROP VIEW default.v_path_frozen
+-- !query analysis
+DropTableCommand `spark_catalog`.`default`.`v_path_frozen`, false, true, false
+
+
+-- !query
+DROP VIEW sql_path_views_a.v_ctx
+-- !query analysis
+DropTableCommand `spark_catalog`.`sql_path_views_a`.`v_ctx`, false, true, false
+
+
+-- !query
+DROP TABLE sql_path_views_a.frozen_t
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), 
sql_path_views_a.frozen_t
+
+
+-- !query
+DROP TABLE sql_path_views_b.frozen_t
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), 
sql_path_views_b.frozen_t
+
+
+-- !query
+DROP SCHEMA sql_path_views_a
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_views_a]
+
+
+-- !query
+DROP SCHEMA sql_path_views_b
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_views_b]
+
+
+-- !query
+CREATE SCHEMA sql_path_fn_a
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_fn_a]
+
+
+-- !query
+CREATE SCHEMA sql_path_fn_b
+-- !query analysis
+CreateNamespace false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_fn_b]
+
+
+-- !query
+CREATE TABLE sql_path_fn_a.frozen_t USING parquet AS SELECT 10 AS id
+-- !query analysis
+CreateDataSourceTableAsSelectCommand 
`spark_catalog`.`sql_path_fn_a`.`frozen_t`, ErrorIfExists, [id]
+   +- Project [10 AS id#x]
+      +- OneRowRelation
+
+
+-- !query
+CREATE TABLE sql_path_fn_b.frozen_t USING parquet AS SELECT 20 AS id
+-- !query analysis
+CreateDataSourceTableAsSelectCommand 
`spark_catalog`.`sql_path_fn_b`.`frozen_t`, ErrorIfExists, [id]
+   +- Project [20 AS id#x]
+      +- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_a, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_fn_a)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+CREATE FUNCTION default.frozen_fn()
+RETURNS INT
+RETURN (SELECT MAX(id) FROM frozen_t)
+-- !query analysis
+CreateSQLFunctionCommand spark_catalog.default.frozen_fn, INT, (SELECT MAX(id) 
FROM frozen_t), false, false, false, false
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_b, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_fn_b)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT MAX(id) FROM frozen_t AS bare_lookup_uses_live_path
+-- !query analysis
+Aggregate [max(id#x) AS max(id)#x]
++- SubqueryAlias bare_lookup_uses_live_path
+   +- SubqueryAlias spark_catalog.sql_path_fn_b.frozen_t
+      +- Relation spark_catalog.sql_path_fn_b.frozen_t[id#x] parquet
+
+
+-- !query
+SELECT default.frozen_fn() AS scalar_body_uses_frozen_path
+-- !query analysis
+Project [spark_catalog.default.frozen_fn() AS scalar_body_uses_frozen_path#x]
+:  +- Aggregate [max(id#x) AS max(id)#x]
+:     +- SubqueryAlias spark_catalog.sql_path_fn_a.frozen_t
+:        +- Relation spark_catalog.sql_path_fn_a.frozen_t[id#x] parquet
++- Project
+   +- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_a, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_fn_a)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+CREATE FUNCTION default.frozen_tvf()
+RETURNS TABLE(id INT)
+RETURN SELECT MAX(id) AS id FROM frozen_t
+-- !query analysis
+CreateSQLFunctionCommand spark_catalog.default.frozen_tvf, id INT, SELECT 
MAX(id) AS id FROM frozen_t, true, false, false, false
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_b, system.builtin
+-- !query analysis
+SetPathCommand [SchemaInPath(List(spark_catalog, sql_path_fn_b)), 
SchemaInPath(List(system, builtin))]
+
+
+-- !query
+SELECT * FROM default.frozen_tvf() AS table_body_uses_frozen_path
+-- !query analysis
+Project [id#x]
++- SubqueryAlias table_body_uses_frozen_path
+   +- SQLFunctionNode spark_catalog.default.frozen_tvf
+      +- SubqueryAlias frozen_tvf
+         +- Project [cast(id#x as int) AS id#x]
+            +- Aggregate [max(id#x) AS id#x]
+               +- SubqueryAlias spark_catalog.sql_path_fn_a.frozen_t
+                  +- Relation spark_catalog.sql_path_fn_a.frozen_t[id#x] 
parquet
+
+
+-- !query
+USE spark_catalog.sql_path_fn_a
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_fn_a]
+
+
+-- !query
+CREATE FUNCTION sql_path_fn_a.f_ctx()
+RETURNS STRING
+RETURN concat(current_schema(), '::', current_path())
+-- !query analysis
+CreateSQLFunctionCommand spark_catalog.sql_path_fn_a.f_ctx, STRING, 
concat(current_schema(), '::', current_path()), false, false, false, false
+
+
+-- !query
+USE spark_catalog.sql_path_fn_b
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_fn_b]
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SELECT sql_path_fn_a.f_ctx() AS invoker_context
+-- !query analysis
+Project [spark_catalog.sql_path_fn_a.f_ctx() AS invoker_context#x]
++- Project
+   +- OneRowRelation
+
+
+-- !query
+USE spark_catalog.default
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [default]
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+DROP FUNCTION default.frozen_fn
+-- !query analysis
+DropFunctionCommand spark_catalog.default.frozen_fn, false, false
+
+
+-- !query
+DROP FUNCTION default.frozen_tvf
+-- !query analysis
+DropFunctionCommand spark_catalog.default.frozen_tvf, false, false
+
+
+-- !query
+DROP FUNCTION sql_path_fn_a.f_ctx
+-- !query analysis
+DropFunctionCommand spark_catalog.sql_path_fn_a.f_ctx, false, false
+
+
+-- !query
+DROP TABLE sql_path_fn_a.frozen_t
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), sql_path_fn_a.frozen_t
+
+
+-- !query
+DROP TABLE sql_path_fn_b.frozen_t
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), sql_path_fn_b.frozen_t
+
+
+-- !query
+DROP SCHEMA sql_path_fn_a
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_fn_a]
+
+
+-- !query
+DROP SCHEMA sql_path_fn_b
+-- !query analysis
+DropNamespace false, false
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [sql_path_fn_b]
+
+
+-- !query
+SET spark.sql.defaultPath = system.session, system.builtin
+-- !query analysis
+SetCommand (spark.sql.defaultPath,Some(system.session, system.builtin))
+
+
+-- !query
+SET PATH = system.builtin, system.session
+-- !query analysis
+SetPathCommand [SchemaInPath(List(system, builtin)), SchemaInPath(List(system, 
session))]
+
+
+-- !query
+SELECT current_path() AS explicit_set_path_wins_over_conf
+-- !query analysis
+Project [current_path() AS explicit_set_path_wins_over_conf#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+RESET spark.sql.defaultPath
+-- !query analysis
+ResetCommand spark.sql.defaultPath
+
+
+-- !query
+SET spark.sql.defaultPath = system.session, system.builtin, current_schema
+-- !query analysis
+SetCommand (spark.sql.defaultPath,Some(system.session, system.builtin, 
current_schema))
+
+
+-- !query
+USE spark_catalog.default
+-- !query analysis
+SetCatalogAndNamespace
++- ResolvedNamespace V2SessionCatalog(spark_catalog), [default]
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SELECT current_path() AS default_path_expands_to_conf
+-- !query analysis
+Project [current_path() AS default_path_expands_to_conf#x]
++- OneRowRelation
+
+
+-- !query
+RESET spark.sql.defaultPath
+-- !query analysis
+ResetCommand spark.sql.defaultPath
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query analysis
+SetPathCommand [DefaultPath]
+
+
+-- !query
+SET spark.sql.defaultPath = this is not a path
+-- !query analysis
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "INVALID_CONF_VALUE.REQUIREMENT",
+  "sqlState" : "22022",
+  "messageParameters" : {
+    "confName" : "spark.sql.defaultPath",
+    "confRequirement" : "The value must be empty or a comma-separated SET PATH 
element list (same grammar as SET PATH, except PATH is not allowed).",
+    "confValue" : "this is not a path"
+  }
+}
+
+
+-- !query
+SET spark.sql.defaultPath = PATH, system.builtin
+-- !query analysis
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "INVALID_CONF_VALUE.REQUIREMENT",
+  "sqlState" : "22022",
+  "messageParameters" : {
+    "confName" : "spark.sql.defaultPath",
+    "confRequirement" : "The value must be empty or a comma-separated SET PATH 
element list (same grammar as SET PATH, except PATH is not allowed).",
+    "confValue" : "PATH, system.builtin"
+  }
+}
+
+
+-- !query
+SET spark.sql.path.enabled = false
+-- !query analysis
+SetCommand (spark.sql.path.enabled,Some(false))
+
+
+-- !query
+SELECT current_path() IS NOT NULL AS has_path
+-- !query analysis
+Project [isnotnull(current_path()) AS has_path#x]
++- OneRowRelation
+
+
+-- !query
+SET PATH = spark_catalog.default
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "config" : "spark.sql.path.enabled"
+  }
+}
diff --git a/sql/core/src/test/resources/sql-tests/inputs/sql-path.sql 
b/sql/core/src/test/resources/sql-tests/inputs/sql-path.sql
new file mode 100644
index 000000000000..e9d1d149e7fa
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/sql-path.sql
@@ -0,0 +1,410 @@
+-- ============================================================================
+-- SQL Standard PATH golden coverage
+-- ============================================================================
+--
+-- This file is the readable, SQL-level reference for what the PATH feature
+-- does. It is the primary place to look up "how does SET PATH behave when
+-- I write ..." before reaching for the Scala unit suites. Tests that need
+-- features not expressible in pure SQL (multi-threaded execution, session
+-- cloning, view-metadata inspection, Connect/PySpark plumbing) live in the
+-- matching Scala / Python suites.
+--
+-- Table of Contents
+-- -----------------
+--   1. Default path observability (no SET PATH issued)
+--   2. SET PATH grammar
+--      2.1 Literal schema entries; case preservation; backtick quoting
+--      2.2 DEFAULT_PATH shortcut
+--      2.3 SYSTEM_PATH shortcut
+--      2.4 PATH keyword (append to live path)
+--      2.5 current_schema / current_database shortcuts
+--   3. CURRENT_PATH() builtin
+--      3.1 ANSI no-parens form equals current_path()
+--      3.2 Argument-count validation
+--   4. Static error conditions at SET PATH
+--      4.1 Literal duplicate
+--      4.2 DEFAULT_PATH expansion duplicate
+--      4.3 SYSTEM_PATH expansion duplicate
+--      4.4 current_database vs current_schema cross-alias duplicate
+--      4.5 Single-part schema reference rejected
+--   5. Routine resolution via PATH
+--      5.1 Persistent scalar function follows PATH
+--      5.2 Persistent table function follows PATH
+--      5.3 First-match ordering across two schemas on PATH
+--      5.4 Unqualified miss when schema is not on PATH
+--   6. Relation resolution via PATH
+--      6.1 Table resolved via PATH; first-match ordering
+--      6.2 Unqualified miss when schema is not on PATH
+--   7. Persisted view frozen-path behavior
+--      7.1 View body resolves via creation-time PATH (not invoker PATH)
+--      7.2 current_schema / current_path in view body use invoker context
+--   8. SQL function frozen-path behavior
+--      8.1 Scalar function body resolves via creation-time PATH
+--      8.2 Table function body resolves via creation-time PATH
+--      8.3 current_schema / current_path in function body use invoker context
+--   9. DEFAULT_PATH conf (spark.sql.defaultPath)
+--      9.1 Explicit SET PATH overrides the conf
+--      9.2 SET PATH = DEFAULT_PATH expands to the conf value
+--      9.3 Invalid conf value rejected
+--  10. PATH disabled
+--      10.1 current_path() still resolves (regular builtin)
+--      10.2 SET PATH itself is rejected
+-- ============================================================================
+
+--SET spark.sql.path.enabled=true
+
+
+-- ============================================================================
+-- 1. Default path observability (no SET PATH issued)
+-- ============================================================================
+
+-- The session was opened with PATH enabled and no `SET PATH` issued, so the
+-- effective path is the spark-builtin default ordering with current_schema in
+-- the catalog slot.
+SELECT current_path();
+
+
+-- ============================================================================
+-- 2. SET PATH grammar
+-- ============================================================================
+
+-- 2.1 Literal schema entries; case preservation; backtick quoting 
-------------
+
+SET PATH = spark_catalog.default, system.builtin;
+SELECT current_path();
+
+-- Case is preserved exactly as typed.
+SET PATH = Spark_Catalog.Default, System.Builtin;
+SELECT current_path();
+
+-- Backtick-quoted identifiers that contain dots round-trip with quoting.
+SET PATH = spark_catalog.`sch.b`, system.builtin;
+SELECT current_path();
+
+-- Multi-level namespace (3+ parts) is accepted by the grammar. The stored 
entry
+-- is verified at the Scala layer (SetPathSuite) because the session catalog
+-- only supports single-part namespaces, so calling current_path() while a
+-- multi-level entry is on the path would surface that catalog limitation
+-- rather than the PATH grammar property under test here.
+
+SET PATH = DEFAULT_PATH;
+
+
+-- 2.2 DEFAULT_PATH shortcut 
---------------------------------------------------
+
+SET PATH = DEFAULT_PATH;
+SELECT current_path();
+
+
+-- 2.3 SYSTEM_PATH shortcut 
----------------------------------------------------
+
+SET PATH = SYSTEM_PATH;
+SELECT current_path();
+
+
+-- 2.4 PATH keyword (append to live path) 
--------------------------------------
+
+SET PATH = spark_catalog.default, system.builtin;
+SET PATH = PATH, system.session;
+SELECT current_path();
+
+
+-- 2.5 current_schema / current_database shortcuts 
-----------------------------
+
+USE spark_catalog.default;
+SET PATH = current_schema, system.builtin;
+SELECT current_path();
+
+-- current_database is a SQL alias for current_schema.
+SET PATH = current_database, system.builtin;
+SELECT current_path();
+
+SET PATH = DEFAULT_PATH;
+
+
+-- ============================================================================
+-- 3. CURRENT_PATH() builtin
+-- ============================================================================
+
+-- 3.1 ANSI no-parens form equals current_path() ------------------------------
+
+SET PATH = spark_catalog.default, system.builtin;
+SELECT CURRENT_PATH = current_path() AS ansi_form_matches;
+
+
+-- 3.2 Argument-count validation ----------------------------------------------
+
+SELECT current_path(1);
+
+SET PATH = DEFAULT_PATH;
+
+
+-- ============================================================================
+-- 4. Static error conditions at SET PATH
+-- ============================================================================
+
+-- 4.1 Literal duplicate 
-------------------------------------------------------
+
+SET PATH = spark_catalog.default, spark_catalog.default;
+
+-- Case-insensitive duplicate is still flagged.
+SET PATH = spark_catalog.DEFAULT, spark_catalog.default;
+
+
+-- 4.2 DEFAULT_PATH expansion duplicate 
----------------------------------------
+
+-- DEFAULT_PATH already contains system.builtin; listing it again is a 
duplicate
+-- after expansion.
+SET PATH = DEFAULT_PATH, system.builtin;
+
+
+-- 4.3 SYSTEM_PATH expansion duplicate 
-----------------------------------------
+
+SET PATH = SYSTEM_PATH, SYSTEM_PATH;
+
+
+-- 4.4 current_database vs current_schema cross-alias duplicate 
----------------
+
+SET PATH = current_database, current_schema;
+
+
+-- 4.5 Single-part schema reference rejected 
-----------------------------------
+
+SET PATH = my_schema_no_catalog;
+
+
+-- ============================================================================
+-- 5. Routine resolution via PATH
+-- ============================================================================
+
+-- 5.1 Persistent scalar function follows PATH 
---------------------------------
+
+CREATE SCHEMA sql_path_routines;
+CREATE FUNCTION sql_path_routines.pick() RETURNS INT RETURN 7;
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.default, 
system.builtin;
+SELECT pick();
+SET PATH = DEFAULT_PATH;
+
+
+-- 5.2 Persistent table function follows PATH 
----------------------------------
+
+CREATE FUNCTION sql_path_routines.pick_tvf()
+RETURNS TABLE(val INT)
+RETURN SELECT 7 AS val;
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.default, 
system.builtin;
+SELECT * FROM pick_tvf();
+SET PATH = DEFAULT_PATH;
+
+
+-- 5.3 First-match ordering across two schemas on PATH ------------------------
+
+CREATE SCHEMA sql_path_routines_b;
+CREATE FUNCTION sql_path_routines_b.pick() RETURNS INT RETURN 11;
+
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.sql_path_routines_b, 
system.builtin;
+SELECT pick() AS from_first_schema;
+SET PATH = spark_catalog.sql_path_routines_b, spark_catalog.sql_path_routines, 
system.builtin;
+SELECT pick() AS from_first_schema;
+SET PATH = DEFAULT_PATH;
+
+
+-- 5.4 Unqualified miss when schema is not on PATH 
-----------------------------
+
+SET PATH = spark_catalog.default, system.builtin;
+SELECT pick();
+
+-- Cleanup section 5.
+SET PATH = DEFAULT_PATH;
+DROP FUNCTION sql_path_routines.pick;
+DROP FUNCTION sql_path_routines.pick_tvf;
+DROP FUNCTION sql_path_routines_b.pick;
+DROP SCHEMA sql_path_routines;
+DROP SCHEMA sql_path_routines_b;
+
+
+-- ============================================================================
+-- 6. Relation resolution via PATH
+-- ============================================================================
+
+CREATE SCHEMA sql_path_relations_a;
+CREATE SCHEMA sql_path_relations_b;
+CREATE TABLE sql_path_relations_a.tbl USING parquet AS SELECT 1 AS id;
+CREATE TABLE sql_path_relations_b.tbl USING parquet AS SELECT 2 AS id;
+
+-- 6.1 First-match ordering 
----------------------------------------------------
+
+SET PATH = spark_catalog.sql_path_relations_a, 
spark_catalog.sql_path_relations_b, system.builtin;
+SELECT id FROM tbl AS from_first_schema;
+SET PATH = spark_catalog.sql_path_relations_b, 
spark_catalog.sql_path_relations_a, system.builtin;
+SELECT id FROM tbl AS from_first_schema;
+
+
+-- 6.2 Unqualified miss when schema is not on PATH 
-----------------------------
+
+SET PATH = spark_catalog.default, system.builtin;
+SELECT id FROM tbl;
+
+-- Cleanup section 6.
+SET PATH = DEFAULT_PATH;
+DROP TABLE sql_path_relations_a.tbl;
+DROP TABLE sql_path_relations_b.tbl;
+DROP SCHEMA sql_path_relations_a;
+DROP SCHEMA sql_path_relations_b;
+
+
+-- ============================================================================
+-- 7. Persisted view frozen-path behavior
+-- ============================================================================
+
+CREATE SCHEMA sql_path_views_a;
+CREATE SCHEMA sql_path_views_b;
+CREATE TABLE sql_path_views_a.frozen_t USING parquet AS SELECT 1 AS id;
+CREATE TABLE sql_path_views_b.frozen_t USING parquet AS SELECT 2 AS id;
+
+-- 7.1 View body resolves via creation-time PATH (not invoker PATH) 
------------
+
+SET PATH = spark_catalog.sql_path_views_a, system.builtin;
+CREATE VIEW default.v_path_frozen AS SELECT id FROM frozen_t;
+
+-- Flip the live PATH; the view body's unqualified `frozen_t` must still
+-- resolve through the schema captured at CREATE VIEW (sql_path_views_a, id=1).
+-- A bare query against `frozen_t` from the session follows the LIVE PATH and
+-- returns the other table's row (id=2).
+SET PATH = spark_catalog.sql_path_views_b, system.builtin;
+SELECT id FROM frozen_t AS bare_lookup_uses_live_path;
+SELECT id FROM default.v_path_frozen AS view_body_uses_frozen_path;
+
+
+-- 7.2 current_schema / current_path in view body use invoker context 
----------
+
+USE spark_catalog.sql_path_views_a;
+CREATE VIEW sql_path_views_a.v_ctx AS
+SELECT current_schema() AS cs, current_path() AS cp;
+
+USE spark_catalog.sql_path_views_b;
+SET PATH = DEFAULT_PATH;
+-- The view body re-evaluates current_schema() / current_path() on every
+-- invocation against the INVOKER's context, not the creator's. The result
+-- here must reflect sql_path_views_b (the invoker), not sql_path_views_a
+-- (the creator's schema at CREATE VIEW).
+SELECT cs, cp FROM sql_path_views_a.v_ctx;
+
+-- Cleanup section 7.
+USE spark_catalog.default;
+SET PATH = DEFAULT_PATH;
+DROP VIEW default.v_path_frozen;
+DROP VIEW sql_path_views_a.v_ctx;
+DROP TABLE sql_path_views_a.frozen_t;
+DROP TABLE sql_path_views_b.frozen_t;
+DROP SCHEMA sql_path_views_a;
+DROP SCHEMA sql_path_views_b;
+
+
+-- ============================================================================
+-- 8. SQL function frozen-path behavior
+-- ============================================================================
+
+CREATE SCHEMA sql_path_fn_a;
+CREATE SCHEMA sql_path_fn_b;
+CREATE TABLE sql_path_fn_a.frozen_t USING parquet AS SELECT 10 AS id;
+CREATE TABLE sql_path_fn_b.frozen_t USING parquet AS SELECT 20 AS id;
+
+-- 8.1 Scalar function body resolves via creation-time PATH 
--------------------
+
+SET PATH = spark_catalog.sql_path_fn_a, system.builtin;
+CREATE FUNCTION default.frozen_fn()
+RETURNS INT
+RETURN (SELECT MAX(id) FROM frozen_t);
+
+SET PATH = spark_catalog.sql_path_fn_b, system.builtin;
+SELECT MAX(id) FROM frozen_t AS bare_lookup_uses_live_path;
+SELECT default.frozen_fn() AS scalar_body_uses_frozen_path;
+
+
+-- 8.2 Table function body resolves via creation-time PATH 
---------------------
+
+SET PATH = spark_catalog.sql_path_fn_a, system.builtin;
+CREATE FUNCTION default.frozen_tvf()
+RETURNS TABLE(id INT)
+RETURN SELECT MAX(id) AS id FROM frozen_t;
+
+SET PATH = spark_catalog.sql_path_fn_b, system.builtin;
+SELECT * FROM default.frozen_tvf() AS table_body_uses_frozen_path;
+
+
+-- 8.3 current_schema / current_path in function body use invoker context -----
+
+USE spark_catalog.sql_path_fn_a;
+CREATE FUNCTION sql_path_fn_a.f_ctx()
+RETURNS STRING
+RETURN concat(current_schema(), '::', current_path());
+
+USE spark_catalog.sql_path_fn_b;
+SET PATH = DEFAULT_PATH;
+-- Like 7.2: current_schema() / current_path() in a SQL function body bind to
+-- the INVOKER's context, not the creator's.
+SELECT sql_path_fn_a.f_ctx() AS invoker_context;
+
+-- Cleanup section 8.
+USE spark_catalog.default;
+SET PATH = DEFAULT_PATH;
+DROP FUNCTION default.frozen_fn;
+DROP FUNCTION default.frozen_tvf;
+DROP FUNCTION sql_path_fn_a.f_ctx;
+DROP TABLE sql_path_fn_a.frozen_t;
+DROP TABLE sql_path_fn_b.frozen_t;
+DROP SCHEMA sql_path_fn_a;
+DROP SCHEMA sql_path_fn_b;
+
+
+-- ============================================================================
+-- 9. DEFAULT_PATH conf (spark.sql.defaultPath)
+-- ============================================================================
+--
+-- The conf's RHS is captured as a raw string by the SQL `SET key = value`
+-- form; keywords like `current_schema` and shortcut tokens like `SYSTEM_PATH`
+-- must be written WITHOUT backticks so the conf's SET-PATH-grammar validator
+-- recognizes them as path tokens rather than 1-part quoted identifiers.
+
+-- 9.1 Explicit SET PATH overrides the conf 
------------------------------------
+
+SET spark.sql.defaultPath = system.session, system.builtin;
+SET PATH = system.builtin, system.session;
+SELECT current_path() AS explicit_set_path_wins_over_conf;
+SET PATH = DEFAULT_PATH;
+RESET spark.sql.defaultPath;
+
+
+-- 9.2 SET PATH = DEFAULT_PATH expands to the conf value 
-----------------------
+
+SET spark.sql.defaultPath = system.session, system.builtin, current_schema;
+USE spark_catalog.default;
+SET PATH = DEFAULT_PATH;
+SELECT current_path() AS default_path_expands_to_conf;
+RESET spark.sql.defaultPath;
+SET PATH = DEFAULT_PATH;
+
+
+-- 9.3 Invalid conf value rejected at SET time 
---------------------------------
+
+SET spark.sql.defaultPath = this is not a path;
+
+-- The PATH keyword is not allowed in the conf value (it would create a cycle).
+SET spark.sql.defaultPath = PATH, system.builtin;
+
+
+-- ============================================================================
+-- 10. PATH disabled
+-- ============================================================================
+
+SET spark.sql.path.enabled = false;
+
+
+-- 10.1 current_path() still resolves (regular builtin) 
------------------------
+
+SELECT current_path() IS NOT NULL AS has_path;
+
+
+-- 10.2 SET PATH itself is rejected 
--------------------------------------------
+
+SET PATH = spark_catalog.default;
diff --git a/sql/core/src/test/resources/sql-tests/results/sql-path.sql.out 
b/sql/core/src/test/resources/sql-tests/results/sql-path.sql.out
new file mode 100644
index 000000000000..52d01ccb80ba
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/sql-path.sql.out
@@ -0,0 +1,1202 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+system.builtin,system.session,spark_catalog.default
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+spark_catalog.default,system.builtin
+
+
+-- !query
+SET PATH = Spark_Catalog.Default, System.Builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+Spark_Catalog.Default,System.Builtin
+
+
+-- !query
+SET PATH = spark_catalog.`sch.b`, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+spark_catalog.`sch.b`,system.builtin
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+system.builtin,system.session,spark_catalog.default
+
+
+-- !query
+SET PATH = SYSTEM_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+system.builtin,system.session
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = PATH, system.session
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+spark_catalog.default,system.builtin,system.session
+
+
+-- !query
+USE spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = current_schema, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+spark_catalog.default,system.builtin
+
+
+-- !query
+SET PATH = current_database, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path()
+-- !query schema
+struct<current_path():string>
+-- !query output
+spark_catalog.default,system.builtin
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT CURRENT_PATH = current_path() AS ansi_form_matches
+-- !query schema
+struct<ansi_form_matches:boolean>
+-- !query output
+true
+
+
+-- !query
+SELECT current_path(1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "WRONG_NUM_ARGS.WITHOUT_SUGGESTION",
+  "sqlState" : "42605",
+  "messageParameters" : {
+    "actualNum" : "1",
+    "docroot" : "https://spark.apache.org/docs/latest";,
+    "expectedNum" : "0",
+    "functionName" : "`current_path`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "current_path(1)"
+  } ]
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.default, spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "spark_catalog.default"
+  }
+}
+
+
+-- !query
+SET PATH = spark_catalog.DEFAULT, spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "spark_catalog.default"
+  }
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH, system.builtin
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "system.builtin"
+  }
+}
+
+
+-- !query
+SET PATH = SYSTEM_PATH, SYSTEM_PATH
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "system.builtin"
+  }
+}
+
+
+-- !query
+SET PATH = current_database, current_schema
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "DUPLICATE_SQL_PATH_ENTRY",
+  "sqlState" : "42732",
+  "messageParameters" : {
+    "pathEntry" : "current_schema"
+  }
+}
+
+
+-- !query
+SET PATH = my_schema_no_catalog
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "INVALID_SQL_PATH_SCHEMA_REFERENCE",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "qualifiedName" : "my_schema_no_catalog"
+  }
+}
+
+
+-- !query
+CREATE SCHEMA sql_path_routines
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE FUNCTION sql_path_routines.pick() RETURNS INT RETURN 7
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.default, 
system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT pick()
+-- !query schema
+struct<spark_catalog.sql_path_routines.pick():int>
+-- !query output
+7
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE FUNCTION sql_path_routines.pick_tvf()
+RETURNS TABLE(val INT)
+RETURN SELECT 7 AS val
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.default, 
system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM pick_tvf()
+-- !query schema
+struct<val:int>
+-- !query output
+7
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_routines_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE FUNCTION sql_path_routines_b.pick() RETURNS INT RETURN 11
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines, spark_catalog.sql_path_routines_b, 
system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT pick() AS from_first_schema
+-- !query schema
+struct<from_first_schema:int>
+-- !query output
+7
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_routines_b, spark_catalog.sql_path_routines, 
system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT pick() AS from_first_schema
+-- !query schema
+struct<from_first_schema:int>
+-- !query output
+11
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT pick()
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVED_ROUTINE",
+  "sqlState" : "42883",
+  "messageParameters" : {
+    "routineName" : "`pick`",
+    "searchPath" : "[`spark_catalog`.`default`, `system`.`builtin`]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 13,
+    "fragment" : "pick()"
+  } ]
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP FUNCTION sql_path_routines.pick
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP FUNCTION sql_path_routines.pick_tvf
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP FUNCTION sql_path_routines_b.pick
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_routines
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_routines_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_relations_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_relations_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE sql_path_relations_a.tbl USING parquet AS SELECT 1 AS id
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE sql_path_relations_b.tbl USING parquet AS SELECT 2 AS id
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_relations_a, 
spark_catalog.sql_path_relations_b, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT id FROM tbl AS from_first_schema
+-- !query schema
+struct<id:int>
+-- !query output
+1
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_relations_b, 
spark_catalog.sql_path_relations_a, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT id FROM tbl AS from_first_schema
+-- !query schema
+struct<id:int>
+-- !query output
+2
+
+
+-- !query
+SET PATH = spark_catalog.default, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT id FROM tbl
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "TABLE_OR_VIEW_NOT_FOUND",
+  "sqlState" : "42P01",
+  "messageParameters" : {
+    "relationName" : "`tbl`",
+    "searchPath" : "[`spark_catalog`.`default`, `system`.`builtin`]"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 16,
+    "stopIndex" : 18,
+    "fragment" : "tbl"
+  } ]
+}
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE sql_path_relations_a.tbl
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE sql_path_relations_b.tbl
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_relations_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_relations_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_views_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_views_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE sql_path_views_a.frozen_t USING parquet AS SELECT 1 AS id
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE sql_path_views_b.frozen_t USING parquet AS SELECT 2 AS id
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_views_a, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE VIEW default.v_path_frozen AS SELECT id FROM frozen_t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_views_b, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT id FROM frozen_t AS bare_lookup_uses_live_path
+-- !query schema
+struct<id:int>
+-- !query output
+2
+
+
+-- !query
+SELECT id FROM default.v_path_frozen AS view_body_uses_frozen_path
+-- !query schema
+struct<id:int>
+-- !query output
+1
+
+
+-- !query
+USE spark_catalog.sql_path_views_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE VIEW sql_path_views_a.v_ctx AS
+SELECT current_schema() AS cs, current_path() AS cp
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+USE spark_catalog.sql_path_views_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT cs, cp FROM sql_path_views_a.v_ctx
+-- !query schema
+struct<cs:string,cp:string>
+-- !query output
+sql_path_views_b       
system.builtin,system.session,spark_catalog.sql_path_views_b
+
+
+-- !query
+USE spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP VIEW default.v_path_frozen
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP VIEW sql_path_views_a.v_ctx
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE sql_path_views_a.frozen_t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE sql_path_views_b.frozen_t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_views_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_views_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_fn_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE SCHEMA sql_path_fn_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE sql_path_fn_a.frozen_t USING parquet AS SELECT 10 AS id
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TABLE sql_path_fn_b.frozen_t USING parquet AS SELECT 20 AS id
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_a, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE FUNCTION default.frozen_fn()
+RETURNS INT
+RETURN (SELECT MAX(id) FROM frozen_t)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_b, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT MAX(id) FROM frozen_t AS bare_lookup_uses_live_path
+-- !query schema
+struct<max(id):int>
+-- !query output
+20
+
+
+-- !query
+SELECT default.frozen_fn() AS scalar_body_uses_frozen_path
+-- !query schema
+struct<scalar_body_uses_frozen_path:int>
+-- !query output
+10
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_a, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE FUNCTION default.frozen_tvf()
+RETURNS TABLE(id INT)
+RETURN SELECT MAX(id) AS id FROM frozen_t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = spark_catalog.sql_path_fn_b, system.builtin
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM default.frozen_tvf() AS table_body_uses_frozen_path
+-- !query schema
+struct<id:int>
+-- !query output
+10
+
+
+-- !query
+USE spark_catalog.sql_path_fn_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE FUNCTION sql_path_fn_a.f_ctx()
+RETURNS STRING
+RETURN concat(current_schema(), '::', current_path())
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+USE spark_catalog.sql_path_fn_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT sql_path_fn_a.f_ctx() AS invoker_context
+-- !query schema
+struct<invoker_context:string>
+-- !query output
+sql_path_fn_b::system.builtin,system.session,spark_catalog.sql_path_fn_b
+
+
+-- !query
+USE spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP FUNCTION default.frozen_fn
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP FUNCTION default.frozen_tvf
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP FUNCTION sql_path_fn_a.f_ctx
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE sql_path_fn_a.frozen_t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE sql_path_fn_b.frozen_t
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_fn_a
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP SCHEMA sql_path_fn_b
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET spark.sql.defaultPath = system.session, system.builtin
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.defaultPath  system.session, system.builtin
+
+
+-- !query
+SET PATH = system.builtin, system.session
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path() AS explicit_set_path_wins_over_conf
+-- !query schema
+struct<explicit_set_path_wins_over_conf:string>
+-- !query output
+system.builtin,system.session
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+RESET spark.sql.defaultPath
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET spark.sql.defaultPath = system.session, system.builtin, current_schema
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.defaultPath  system.session, system.builtin, current_schema
+
+
+-- !query
+USE spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT current_path() AS default_path_expands_to_conf
+-- !query schema
+struct<default_path_expands_to_conf:string>
+-- !query output
+system.session,system.builtin,spark_catalog.default
+
+
+-- !query
+RESET spark.sql.defaultPath
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET PATH = DEFAULT_PATH
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET spark.sql.defaultPath = this is not a path
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "INVALID_CONF_VALUE.REQUIREMENT",
+  "sqlState" : "22022",
+  "messageParameters" : {
+    "confName" : "spark.sql.defaultPath",
+    "confRequirement" : "The value must be empty or a comma-separated SET PATH 
element list (same grammar as SET PATH, except PATH is not allowed).",
+    "confValue" : "this is not a path"
+  }
+}
+
+
+-- !query
+SET spark.sql.defaultPath = PATH, system.builtin
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkIllegalArgumentException
+{
+  "errorClass" : "INVALID_CONF_VALUE.REQUIREMENT",
+  "sqlState" : "22022",
+  "messageParameters" : {
+    "confName" : "spark.sql.defaultPath",
+    "confRequirement" : "The value must be empty or a comma-separated SET PATH 
element list (same grammar as SET PATH, except PATH is not allowed).",
+    "confValue" : "PATH, system.builtin"
+  }
+}
+
+
+-- !query
+SET spark.sql.path.enabled = false
+-- !query schema
+struct<key:string,value:string>
+-- !query output
+spark.sql.path.enabled false
+
+
+-- !query
+SELECT current_path() IS NOT NULL AS has_path
+-- !query schema
+struct<has_path:boolean>
+-- !query output
+true
+
+
+-- !query
+SET PATH = spark_catalog.default
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNSUPPORTED_FEATURE.SET_PATH_WHEN_DISABLED",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "config" : "spark.sql.path.enabled"
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index 18b9f6b6f3b7..245398a4694e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType}
 
 /**
  * Tests for SET PATH command and session path management.
@@ -503,11 +504,110 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
-  // TODO: cloneSession() constructs a new CatalogManager per forked session 
and
-  // explicitly copies only the stored session path via copySessionPathFrom.
-  // Other CatalogManager state propagation (current catalog/namespace, 
registered
-  // catalogs) on clone is currently incidental -- audit and pin down the 
intended
-  // semantics in a follow-up.
+  // --- cloneSession() propagation matrix 
--------------------------------------
+  // The cloned session is built via `BaseSessionStateBuilder` from a parent
+  // `SessionState`. Per-component hand-offs on clone:
+  //   - `SessionCatalog.copyStateTo` copies `currentDb` and `tempViews`,
+  //   - `CatalogManager.copySessionPathFrom` copies the stored `_sessionPath`,
+  //   - `functionRegistry.clone()` and `tableFunctionRegistry.clone()` copy
+  //     temporary functions.
+  // What is NOT propagated:
+  //   - the temp variable registry (new `TempVariableManager` per session),
+  //   - the `CatalogManager` current-catalog / current-namespace (re-read from
+  //     conf defaults in the child),
+  //   - the registered v2 `catalogs` map (lazy-loaded per session).
+  // The tests below pin this observed behavior so any future change has to
+  // update the assertions.
+
+  test("cloneSession: stored SET PATH propagates to the child session") {
+    withPathEnabled {
+      sql("SET PATH = spark_catalog.default, system.builtin")
+      try {
+        val child = spark.cloneSession()
+        val entries = pathEntries(
+          child.sql("SELECT current_path()").collect().head.getString(0))
+        assert(entries === Seq("spark_catalog.default", "system.builtin"),
+          s"Cloned session should inherit stored SET PATH; got: $entries")
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+      }
+    }
+  }
+
+  test("cloneSession: USE SCHEMA on the parent propagates to the child") {
+    sql("CREATE SCHEMA IF NOT EXISTS path_clone_use")
+    try {
+      sql("USE spark_catalog.path_clone_use")
+      val child = spark.cloneSession()
+      val childDb = child.sql("SELECT current_database()").head().getString(0)
+      assert(childDb == "path_clone_use",
+        s"Cloned session should inherit the parent's current schema; got: 
$childDb")
+    } finally {
+      sql("USE spark_catalog.default")
+      sql("DROP SCHEMA IF EXISTS path_clone_use")
+    }
+  }
+
+  test("cloneSession: temp views on the parent propagate to the child") {
+    sql("CREATE TEMPORARY VIEW path_clone_view AS SELECT 1 AS c")
+    try {
+      val child = spark.cloneSession()
+      checkAnswer(child.sql("SELECT c FROM path_clone_view"), Row(1))
+    } finally {
+      sql("DROP VIEW IF EXISTS path_clone_view")
+    }
+  }
+
+  test("cloneSession: temp functions on the parent propagate to the child 
(cloned " +
+      "functionRegistry)") {
+    sql("CREATE TEMPORARY FUNCTION path_clone_fn() RETURNS INT RETURN 42")
+    try {
+      val child = spark.cloneSession()
+      checkAnswer(child.sql("SELECT path_clone_fn()"), Row(42))
+      // Snapshot semantics: dropping in the parent must not affect the 
already-cloned child.
+      sql("DROP TEMPORARY FUNCTION path_clone_fn")
+      checkAnswer(child.sql("SELECT path_clone_fn()"), Row(42))
+    } finally {
+      sql("DROP TEMPORARY FUNCTION IF EXISTS path_clone_fn")
+    }
+  }
+
+  test("cloneSession: temp variables on the parent are NOT propagated to the 
child") {
+    sql("DECLARE OR REPLACE VARIABLE path_clone_var INT DEFAULT 7")
+    try {
+      val child = spark.cloneSession()
+      val e = intercept[AnalysisException] {
+        child.sql("SELECT path_clone_var").collect()
+      }
+      // Either UNRESOLVED_VARIABLE or UNRESOLVED_COLUMN; both confirm the 
variable
+      // did not survive the clone.
+      assert(
+        e.getCondition == "UNRESOLVED_VARIABLE" ||
+          e.getCondition.startsWith("UNRESOLVED_COLUMN"),
+        s"Temp variables should NOT propagate to the clone; got: 
${e.getCondition}")
+    } finally {
+      sql("DROP TEMPORARY VARIABLE IF EXISTS path_clone_var")
+    }
+  }
+
+  test("cloneSession: child SET PATH does not leak back to the parent") {
+    withPathEnabled {
+      sql("SET PATH = spark_catalog.default, system.builtin")
+      try {
+        val child = spark.cloneSession()
+        child.sql("SET PATH = system.session, system.builtin")
+        val parentEntries = pathEntries(currentPath())
+        assert(parentEntries === Seq("spark_catalog.default", 
"system.builtin"),
+          s"Child SET PATH must not affect the parent; parent got: 
$parentEntries")
+        val childEntries = pathEntries(
+          child.sql("SELECT current_path()").collect().head.getString(0))
+        assert(childEntries === Seq("system.session", "system.builtin"),
+          s"Child SET PATH should be visible only in the child; child got: 
$childEntries")
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+      }
+    }
+  }
 
   // --- Resolution tests: verify SET PATH affects actual table/function 
lookup ---
 
@@ -786,6 +886,169 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("path-driven COUNT(*) rewrite gate: temp count shadowing builtin under 
SET PATH " +
+      "(session-first) suppresses the * -> 1 rewrite") {
+    // `Analyzer.matchesFunctionName` consults
+    // `FunctionResolution.isSessionBeforeBuiltinInPath` to decide whether 
COUNT(*) is the
+    // builtin (eligible for the COUNT(*) -> COUNT(1) shortcut) or a 
user-defined override.
+    // Default `sessionFunctionResolutionOrder` is "second", so creating a 
temp count while
+    // the default PATH is in effect passes the security check. Once SET PATH 
puts
+    // `system.session` before `system.builtin`, the rewrite must be 
suppressed and the
+    // star expansion must reach the temp `count`.
+    withPathEnabled {
+      sql("CREATE TEMPORARY FUNCTION count(x INT) RETURNS INT RETURN x + 100")
+      try {
+        // PATH still has builtin first: count(*) rewrites to count(1), which 
resolves to
+        // the builtin count and returns the row count of the input (1).
+        checkAnswer(sql("SELECT count(*) FROM VALUES (1) AS t(a)"), Row(1))
+
+        // Put session before builtin via SET PATH. The rewrite gate now 
reports
+        // `isSessionBeforeBuiltinInPath = true` AND a temp count exists, so 
the
+        // analyzer must NOT collapse `count(*)` to `count(1)`. The `*` then 
expands
+        // against the table's single column to `count(a)`, which resolves 
through
+        // the temp under the live path: 1 + 100 = 101.
+        sql("SET PATH = system.session, system.builtin")
+        checkAnswer(sql("SELECT count(*) FROM VALUES (1) AS t(a)"), Row(101))
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql("DROP TEMPORARY FUNCTION IF EXISTS count")
+      }
+    }
+  }
+
+  test("path-driven COUNT(*) rewrite gate: rewrite still applies for unrelated 
builtins") {
+    // The gate fires ONLY when a temp function with the same unqualified
+    // name as the builtin exists. A temp with a different name must not 
affect the
+    // COUNT(*) -> COUNT(1) shortcut even when session is searched before 
builtin.
+    withPathEnabled {
+      sql("CREATE TEMPORARY FUNCTION my_helper(x INT) RETURNS INT RETURN x + 
1")
+      try {
+        sql("SET PATH = system.session, system.builtin")
+        // No temp `count` exists; the rewrite still fires and the builtin row 
counter
+        // returns the row count of the input (3).
+        checkAnswer(sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)"), 
Row(3))
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql("DROP TEMPORARY FUNCTION IF EXISTS my_helper")
+      }
+    }
+  }
+
+  test("path-driven COUNT(*) rewrite gate: single-pass resolver suppresses the 
rewrite " +
+      "under SET PATH (session-first)") {
+    // The single-pass resolver mirrors the fixed-point gate via
+    // `FunctionResolverUtils.isUnqualifiedCountShadowedByTemp`, which is 
wired into
+    // `isNonDistinctCount` and consulted by `handleStarInArguments`.
+    //
+    // Setup (`CREATE TEMPORARY FUNCTION`, `SET PATH`) and execution (Dataset 
collect via
+    // checkAnswer, which inserts a `DeserializeToObject` node the single-pass 
analyzer
+    // does not yet support) are run under the fixed-point analyzer; only the 
actual
+    // count(*) analysis is run under the single-pass analyzer, and we assert 
against the
+    // analyzed plan's output schema. The builtin count returns BIGINT 
(rewrite applied);
+    // the temp count(INT) returns INT (rewrite suppressed and the star 
expansion routes
+    // through the temp), so the schema's first-field dataType tells us which 
branch fired.
+    withPathEnabled {
+      sql("CREATE TEMPORARY FUNCTION count(x INT) RETURNS INT RETURN x + 100")
+      try {
+        val countStarSql = "SELECT count(*) FROM VALUES (1) AS t(a)"
+
+        // PATH builtin-first: the single-pass gate reports
+        // `isUnqualifiedCountShadowedByTemp = false`, the shortcut fires, and 
the analyzed
+        // output is the BIGINT builtin count.
+        withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key -> 
"true") {
+          val tpe = 
spark.sql(countStarSql).queryExecution.analyzed.schema.head.dataType
+          assert(tpe == LongType,
+            s"Expected BIGINT (builtin count rewrite); got: $tpe")
+        }
+
+        sql("SET PATH = system.session, system.builtin")
+
+        // PATH session-first: the gate reports true, the rewrite is 
suppressed, the star
+        // expands against `a`, and the temp count(INT) wins; analyzed output 
is INT.
+        withSQLConf(SQLConf.ANALYZER_SINGLE_PASS_RESOLVER_ENABLED.key -> 
"true") {
+          val tpe = 
spark.sql(countStarSql).queryExecution.analyzed.schema.head.dataType
+          assert(tpe == IntegerType,
+            s"Expected INT (temp count; rewrite suppressed); got: $tpe")
+        }
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql("DROP TEMPORARY FUNCTION IF EXISTS count")
+      }
+    }
+  }
+
+  test("PATH enabled: concurrent SET PATH and unqualified lookups do not 
deadlock") {
+    // SessionCatalog.lookupBuiltinOrTempFunction is intentionally NOT
+    // synchronized on SessionCatalog because the path-driven kinds provider 
acquires
+    // CatalogManager.synchronized, and another thread holding that lock can 
call back
+    // into SessionCatalog (e.g. via setCurrentNamespace). This test hammers 
both sides
+    // concurrently: one thread flips SET PATH while another performs 
unqualified
+    // function lookups that go through the kinds provider. Within the budget 
we should
+    // observe no deadlock and no spurious analysis failures.
+    withPathEnabled {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(2)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val setterThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            if ((i % 2) == 0) {
+              sql("SET PATH = spark_catalog.default, system.builtin")
+            } else {
+              sql("SET PATH = system.builtin, system.session, 
spark_catalog.default")
+            }
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SetPathSuite-setter")
+
+      val lookupThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            // Forces unqualified function resolution against the live PATH 
and triggers
+            // the session-kinds provider on the catalog-manager side.
+            val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)")
+              .head().getLong(0)
+            assert(n == 3L, s"unexpected count: $n at iteration $i")
+            iterations.incrementAndGet()
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SetPathSuite-lookup")
+
+      setterThread.start()
+      lookupThread.start()
+
+      // Generous join: 30s is plenty for 200 cheap queries on either side and 
gives a
+      // clear failure signal if the implementation regresses into a deadlock.
+      val joinMillis = 30000L
+      setterThread.join(joinMillis)
+      lookupThread.join(joinMillis)
+
+      assert(!setterThread.isAlive,
+        "SET PATH thread did not finish; potential deadlock between 
SessionCatalog and " +
+          "CatalogManager synchronized blocks.")
+      assert(!lookupThread.isAlive,
+        "Lookup thread did not finish; potential deadlock between 
SessionCatalog and " +
+          "CatalogManager synchronized blocks.")
+      assert(errors.isEmpty,
+        s"Concurrent lookups raised unexpected errors: 
${errors.toArray.mkString("; ")}")
+      assert(iterations.get() > 0,
+        "Lookup thread never completed a query; suspect contention or 
deadlock.")
+      sql("SET PATH = DEFAULT_PATH")
+    }
+  }
+
   test("DEFAULT_PATH conf: duplicate entries are tolerated (first-match 
resolution)") {
     // Lookup uses first-match resolution, so redundant entries on 
DEFAULT_PATH are dead code
     // rather than an error. (Contrast with SET PATH, which still rejects 
static duplicates as
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SqlPathV2CatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SqlPathV2CatalogSuite.scala
new file mode 100644
index 000000000000..9e365c720266
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SqlPathV2CatalogSuite.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import java.util.Collections
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog, 
SupportsNamespaces}
+import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, 
UnboundFunction}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, IntegerType, StringType}
+
+/**
+ * End-to-end coverage of [[SQLConf.PATH_ENABLED]] resolution through 
non-session V2 catalogs.
+ *
+ * Other path tests live in `SetPathSuite` (session catalog) and 
`ProcedureSuite`
+ * (procedures via CALL). This suite specifically exercises:
+ *   - unqualified table resolution across two V2 catalogs in SET PATH,
+ *   - first-match ordering when both catalogs hold the same name,
+ *   - unqualified V2 function resolution across two V2 catalogs in SET PATH,
+ *   - the negative case where the unqualified name only lives in a catalog
+ *     that is NOT on the path.
+ */
+class SqlPathV2CatalogSuite extends SharedSparkSession {
+
+  private val emptyProps: java.util.Map[String, String] = 
Collections.emptyMap()
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set("spark.sql.catalog.pathcat", 
classOf[InMemoryCatalog].getName)
+    spark.conf.set("spark.sql.catalog.pathcat2", 
classOf[InMemoryCatalog].getName)
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      spark.sessionState.catalogManager.reset()
+      spark.sessionState.conf.unsetConf("spark.sql.catalog.pathcat")
+      spark.sessionState.conf.unsetConf("spark.sql.catalog.pathcat2")
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  private def v2Catalog(name: String): InMemoryCatalog =
+    
spark.sessionState.catalogManager.catalog(name).asInstanceOf[InMemoryCatalog]
+
+  private def createV2Namespace(catalog: String, ns: String): Unit = {
+    v2Catalog(catalog).asInstanceOf[SupportsNamespaces]
+      .createNamespace(Array(ns), emptyProps)
+  }
+
+  private def addV2Function(
+      catalog: String,
+      ns: String,
+      name: String,
+      fn: UnboundFunction): Unit = {
+    v2Catalog(catalog).createFunction(Identifier.of(Array(ns), name), fn)
+  }
+
+  test("V2 catalogs on SET PATH: unqualified table follows first match") {
+    withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
+      // pathcat and pathcat2 each have a namespace `ns` and a table 
`path_v2_t` with
+      // different contents, so we can tell which catalog supplied the row.
+      createV2Namespace("pathcat", "ns")
+      createV2Namespace("pathcat2", "ns")
+      sql("CREATE TABLE pathcat.ns.path_v2_t (id INT) USING foo")
+      sql("INSERT INTO pathcat.ns.path_v2_t VALUES (10)")
+      sql("CREATE TABLE pathcat2.ns.path_v2_t (id INT) USING foo")
+      sql("INSERT INTO pathcat2.ns.path_v2_t VALUES (20)")
+
+      try {
+        sql("SET PATH = pathcat.ns, pathcat2.ns, system.builtin")
+        checkAnswer(sql("SELECT id FROM path_v2_t"), Row(10))
+
+        sql("SET PATH = pathcat2.ns, pathcat.ns, system.builtin")
+        checkAnswer(sql("SELECT id FROM path_v2_t"), Row(20))
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql("DROP TABLE IF EXISTS pathcat.ns.path_v2_t")
+        sql("DROP TABLE IF EXISTS pathcat2.ns.path_v2_t")
+      }
+    }
+  }
+
+  test("V2 catalogs on SET PATH: unqualified table only in a non-path catalog 
is not found") {
+    withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
+      createV2Namespace("pathcat", "ns_only_here")
+      sql("CREATE TABLE pathcat.ns_only_here.hidden_t (id INT) USING foo")
+      try {
+        // Path does not include pathcat.ns_only_here; bare `hidden_t` must 
not resolve.
+        sql("SET PATH = pathcat2.ns, system.builtin")
+        val e = intercept[AnalysisException] {
+          sql("SELECT id FROM hidden_t").collect()
+        }
+        assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" ||
+            e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"),
+          s"Expected TABLE_OR_VIEW_NOT_FOUND; got: ${e.getCondition}: 
${e.getMessage}")
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql("DROP TABLE IF EXISTS pathcat.ns_only_here.hidden_t")
+      }
+    }
+  }
+
+  test("V2 catalogs on SET PATH: unqualified function follows first match") {
+    withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
+      // Two V2 catalogs each register a `strlen` function under the same name 
but with
+      // distinguishable return values: pathcat returns the true length, 
pathcat2 returns
+      // the length times 100. The result distinguishes which catalog supplied 
the
+      // function for the same argument, so swapping the path order must 
change the row.
+      createV2Namespace("pathcat", "fns")
+      createV2Namespace("pathcat2", "fns")
+      addV2Function("pathcat", "fns", "strlen", StrLen(StrLenDefault))
+      addV2Function("pathcat2", "fns", "strlen", StrLen(StrLenTimes100))
+      try {
+        sql("SET PATH = pathcat.fns, pathcat2.fns, system.builtin")
+        checkAnswer(sql("SELECT strlen('abc')"), Row(3))
+
+        sql("SET PATH = pathcat2.fns, pathcat.fns, system.builtin")
+        checkAnswer(sql("SELECT strlen('abc')"), Row(300))
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        v2Catalog("pathcat").clearFunctions()
+        v2Catalog("pathcat2").clearFunctions()
+      }
+    }
+  }
+}
+
+/**
+ * A small distinguishable companion to `StrLenDefault` (in 
`DataSourceV2FunctionSuite.scala`):
+ * returns `s.length * 100` so V2-function resolution tests across catalogs 
can verify which
+ * catalog supplied the function from the result row alone.
+ */
+case object StrLenTimes100 extends ScalarFunction[Int] {
+  override def inputTypes(): Array[DataType] = Array(StringType)
+  override def resultType(): DataType = IntegerType
+  override def name(): String = "strlen_times_100"
+  override def produceResult(input: InternalRow): Int = 
input.getString(0).length * 100
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index f6ace55849d2..3fb54d7c43d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -1453,6 +1453,106 @@ abstract class SQLViewSuite extends QueryTest {
     }
   }
 
+  test("SPARK-56853: stored view path is ignored when PATH is disabled at read 
time") {
+    // A view created with PATH enabled persists two things in metadata: the 
frozen
+    // resolution path AND the creator session's current catalog+namespace at 
CREATE
+    // VIEW time (the view's `viewCatalogAndNamespace` property). If the 
reader's
+    // session has `spark.sql.path.enabled=false`, the pinned entries are 
intentionally
+    // dropped (`CatalogManager.resolutionPathEntriesForAnalysis`); the view 
body's
+    // unqualified references fall back to that captured catalog+namespace, 
which is
+    // the creator's USE state at CREATE time -- NOT the schema the view 
physically
+    // lives in (the two coincide below only because the test runs
+    // `USE spark_catalog.compat_view_b` before CREATE VIEW). Verify both 
directions:
+    //   - fully-qualified bodies keep working (qualification doesn't depend 
on PATH),
+    //   - unqualified bodies that relied on the frozen path now resolve via 
the
+    //     captured viewCatalogAndNamespace.
+    withDatabase("compat_view_a", "compat_view_b") {
+      sql("CREATE DATABASE compat_view_a")
+      sql("CREATE DATABASE compat_view_b")
+      withTable(
+          "compat_view_a.compat_t",
+          "compat_view_b.compat_t") {
+        sql("CREATE TABLE compat_view_a.compat_t USING parquet AS SELECT 1 AS 
id")
+        sql("CREATE TABLE compat_view_b.compat_t USING parquet AS SELECT 2 AS 
id")
+        withView(
+            "compat_view_b.v_unq_path",
+            "compat_view_b.v_fq_path") {
+          // Create both views with USE compat_view_b in effect so the stored
+          // viewCatalogAndNamespace points at compat_view_b, then SET PATH=a 
so the
+          // frozen path pins compat_view_a.
+          withSQLConf(PATH_ENABLED.key -> "true") {
+            try {
+              sql("USE spark_catalog.compat_view_b")
+              sql("SET PATH = spark_catalog.compat_view_a, system.builtin")
+              sql(
+                """
+                  |CREATE VIEW compat_view_b.v_unq_path AS
+                  |SELECT id FROM compat_t
+                  |""".stripMargin)
+              sql(
+                """
+                  |CREATE VIEW compat_view_b.v_fq_path AS
+                  |SELECT id FROM spark_catalog.compat_view_a.compat_t
+                  |""".stripMargin)
+            } finally {
+              sql("SET PATH = DEFAULT_PATH")
+              sql("USE spark_catalog.default")
+            }
+          }
+
+          // Now read with PATH disabled. The fully-qualified view body is 
independent of
+          // PATH and must keep returning rows from compat_view_a. The 
unqualified-body view
+          // drops its frozen-path pin and falls back to 
viewCatalogAndNamespace
+          // (compat_view_b), so unqualified `compat_t` resolves to 
compat_view_b.compat_t.
+          withSQLConf(PATH_ENABLED.key -> "false") {
+            checkAnswer(sql("SELECT id FROM compat_view_b.v_fq_path"), Row(1))
+            checkAnswer(sql("SELECT id FROM compat_view_b.v_unq_path"), Row(2))
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-56853: stored view path with no fallback target fails clearly 
when PATH is off") {
+    // Same shape as the previous test, but the captured 
`viewCatalogAndNamespace`
+    // (the creator's USE state at CREATE VIEW time -- set here via
+    // `USE spark_catalog.compat_home_only`) does NOT contain the unqualified 
name.
+    // Under PATH disabled the analyzer cannot fall back anywhere, so the 
lookup
+    // must raise TABLE_OR_VIEW_NOT_FOUND against that captured 
catalog+namespace.
+    withDatabase("compat_home_only", "compat_referenced") {
+      sql("CREATE DATABASE compat_home_only")
+      sql("CREATE DATABASE compat_referenced")
+      withTable("compat_referenced.only_here") {
+        sql("CREATE TABLE compat_referenced.only_here USING parquet AS SELECT 
7 AS id")
+        withView("compat_home_only.v_unq_home") {
+          withSQLConf(PATH_ENABLED.key -> "true") {
+            try {
+              sql("USE spark_catalog.compat_home_only")
+              sql("SET PATH = spark_catalog.compat_referenced, system.builtin")
+              sql(
+                """
+                  |CREATE VIEW compat_home_only.v_unq_home AS
+                  |SELECT id FROM only_here
+                  |""".stripMargin)
+            } finally {
+              sql("SET PATH = DEFAULT_PATH")
+              sql("USE spark_catalog.default")
+            }
+          }
+
+          withSQLConf(PATH_ENABLED.key -> "false") {
+            val e = intercept[AnalysisException] {
+              sql("SELECT id FROM compat_home_only.v_unq_home").collect()
+            }
+            assert(e.getCondition == "TABLE_OR_VIEW_NOT_FOUND" ||
+                e.getMessage.contains("TABLE_OR_VIEW_NOT_FOUND"),
+              s"Expected TABLE_OR_VIEW_NOT_FOUND; got: ${e.getCondition}: 
${e.getMessage}")
+          }
+        }
+      }
+    }
+  }
+
   // Regression guard: frozen resolution path must not leak into 
CURRENT_SCHEMA/CURRENT_PATH.
   test("SPARK-56639: current_schema/current_path in persisted view use invoker 
context") {
     withSQLConf(PATH_ENABLED.key -> "true") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterViewSchemaBindingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterViewSchemaBindingSuite.scala
index 39e6e708403a..6b1174856529 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterViewSchemaBindingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterViewSchemaBindingSuite.scala
@@ -17,7 +17,53 @@
 
 package org.apache.spark.sql.execution.command.v1
 
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.execution.command
+import org.apache.spark.sql.internal.SQLConf
 
 class AlterViewSchemaBindingSuite
-  extends command.AlterViewSchemaBindingSuiteBase with ViewCommandSuiteBase
+  extends command.AlterViewSchemaBindingSuiteBase with ViewCommandSuiteBase {
+
+  test("ALTER VIEW ... WITH SCHEMA preserves the frozen SQL path") {
+    // `generateViewProperties(captureNewPath = false)` is the documented 
behavior for
+    // ALTER VIEW WITH SCHEMA: the view's body resolution path must stay 
pinned to the
+    // create-time PATH, not the caller's current PATH. This test creates the 
view under
+    // PATH=a, then runs ALTER VIEW WITH SCHEMA EVOLUTION under PATH=b, and 
asserts that
+    // the persisted VIEW_RESOLUTION_PATH still reflects PATH=a.
+    withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
+      val viewName = "v_path_preserved_on_alter"
+      val view = s"$catalog.$namespace.$viewName"
+      sql(s"CREATE SCHEMA IF NOT EXISTS $catalog.alter_view_path_a")
+      try {
+        sql(s"SET PATH = $catalog.alter_view_path_a, system.builtin")
+        sql(s"CREATE VIEW $view AS SELECT 1 AS x")
+        val pathAfterCreate = spark.sessionState.catalog
+          .getTableMetadata(TableIdentifier(viewName, Some(namespace)))
+          .viewStoredResolutionPath
+          .getOrElse(fail("Expected the view to persist a frozen SQL path"))
+        val parsedCreate = 
CatalogManager.deserializePathEntries(pathAfterCreate)
+          .getOrElse(fail(s"Expected a valid serialized path, got: 
$pathAfterCreate"))
+        assert(parsedCreate.contains(Seq(catalog, "alter_view_path_a")),
+          s"Frozen path should include alter_view_path_a; got: $parsedCreate")
+
+        // Switch the live PATH to something else and run ALTER VIEW WITH 
SCHEMA.
+        // The captureNewPath = false code path must NOT overwrite the frozen 
path.
+        sql(s"SET PATH = $catalog.default, system.builtin")
+        sql(s"ALTER VIEW $view WITH SCHEMA EVOLUTION")
+
+        val pathAfterAlter = spark.sessionState.catalog
+          .getTableMetadata(TableIdentifier(viewName, Some(namespace)))
+          .viewStoredResolutionPath
+          .getOrElse(fail("Frozen SQL path was dropped by ALTER VIEW WITH 
SCHEMA"))
+        assert(pathAfterAlter == pathAfterCreate,
+          s"ALTER VIEW WITH SCHEMA must preserve the frozen path. " +
+            s"Before: $pathAfterCreate; after: $pathAfterAlter")
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql(s"DROP VIEW IF EXISTS $view")
+        sql(s"DROP SCHEMA IF EXISTS $catalog.alter_view_path_a")
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to