This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fa872498ed6e [SPARK-55891][SQL] Preserve the SQL scripting context 
inside EXECUTE IMMEDIATE
fa872498ed6e is described below

commit fa872498ed6e91ded378436ba233506839134fc7
Author: ilicmarkodb <[email protected]>
AuthorDate: Wed Mar 11 22:55:53 2026 +0800

    [SPARK-55891][SQL] Preserve the SQL scripting context inside EXECUTE 
IMMEDIATE
    
    ### What changes were proposed in this pull request?
    In this PR, I propose preserving the SQL Scripting context inside `EXECUTE 
IMMEDIATE`, instead of setting it to `null` while executing the command. I 
propose persisting everything except the variable manager, since local 
variables should not be resolvable inside the `EXECUTE IMMEDIATE` body.
    
    ### Why are the changes needed?
    Improvement.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes, Claude.
    
    Closes #54691 from ilicmarkodb/execute_imm_context.
    
    Authored-by: ilicmarkodb <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/ResolveCatalogs.scala    | 18 ++--
 .../sql/catalyst/analysis/VariableResolution.scala |  2 +-
 .../catalyst/analysis/resolver/ResolverGuard.scala |  2 +-
 .../catalog/SqlScriptingContextManager.scala       |  4 +-
 .../analysis/ResolveExecuteImmediate.scala         | 49 +++++++----
 .../execution/command/v2/CreateVariableExec.scala  |  2 +-
 .../sql/execution/command/v2/FetchCursorExec.scala |  2 +-
 .../sql/execution/command/v2/SetVariableExec.scala |  2 +-
 .../command/v2/VariableAssignmentUtils.scala       |  2 +-
 .../scripting/SqlScriptingContextManagerImpl.scala |  2 +-
 .../execution/ExecuteImmediateEndToEndSuite.scala  | 99 +++++++++++++++++++++-
 11 files changed, 152 insertions(+), 32 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 5fa8ffefc012..9459a043b095 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -45,9 +45,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
       // We resolve only UnresolvedIdentifiers, and pass on the other nodes
       val resolved = identifiers.map {
         case UnresolvedIdentifier(nameParts, _) =>
-          // From scripts we can only create local variables, which must be 
unqualified,
-          // and must not be DECLARE OR REPLACE.
-          if (withinSqlScript) {
+          if (withinLocalVariableScope) {
             if (c.replace) {
               throw new AnalysisException(
                 "INVALID_VARIABLE_DECLARATION.REPLACE_LOCAL_VARIABLE",
@@ -61,7 +59,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
                 Map("varName" -> toSQLId(nameParts)))
             }
 
-            SqlScriptingContextManager.get().map(_.getVariableManager)
+            SqlScriptingContextManager.get().flatMap(_.getVariableManager)
               .getOrElse(throw SparkException.internalError(
                 "Scripting local variable manager should be present in SQL 
script."))
               .qualify(nameParts.last)
@@ -77,7 +75,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
       c.copy(names = resolved)
 
     case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) =>
-      if (withinSqlScript) {
+      if (withinLocalVariableScope) {
         throw new AnalysisException(
           "UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE", 
Map.empty)
       }
@@ -203,8 +201,14 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
     }
   }
 
-  private def withinSqlScript: Boolean =
-    SqlScriptingContextManager.get().map(_.getVariableManager).isDefined
+  /**
+   * Whether we are within a local variable scope. This is true when we are 
directly inside a
+   * SQL script and local variable rules apply (unqualified DECLARE only, no 
OR REPLACE, no DROP).
+   * EXECUTE IMMEDIATE inside a script is not within a local variable scope, 
since it works
+   * with session variables as if it were outside the script.
+   */
+  private def withinLocalVariableScope: Boolean =
+    SqlScriptingContextManager.get().flatMap(_.getVariableManager).isDefined
 
   private def assertValidSessionVariableNameParts(
       nameParts: Seq[String],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
index 2a1f35b0859d..0095885c0135 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
@@ -108,7 +108,7 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
 
     SqlScriptingContextManager
       .get()
-      .map(_.getVariableManager)
+      .flatMap(_.getVariableManager)
       // If variable name is qualified with session.<varName> treat it as a 
session variable.
       .filterNot(
         _ =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
index 4639e0bb3dcd..e0d72a72a65e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
@@ -519,7 +519,7 @@ class ResolverGuard(catalogManager: CatalogManager) extends 
SQLConfHelper {
     catalogManager.tempVariableManager.isEmpty
 
   private def checkScriptingVariables() =
-    
SqlScriptingContextManager.get().map(_.getVariableManager).forall(_.isEmpty)
+    
SqlScriptingContextManager.get().flatMap(_.getVariableManager).forall(_.isEmpty)
 
   private def tryThrowUnsupportedSinglePassAnalyzerFeature(operator: 
LogicalPlan): Unit = {
     tryThrowUnsupportedSinglePassAnalyzerFeature(s"${operator.getClass} 
operator resolution")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
index dcf38558afb7..196591b6d982 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
@@ -27,7 +27,7 @@ trait SqlScriptingContextManager {
   def getContext: SqlScriptingExecutionContextExtension
 
   /**
-   * Get variable manager
+   * Get the variable manager, if available.
    */
-  def getVariableManager: VariableManager
+  def getVariableManager: Option[VariableManager]
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
index 03c04482893b..bf57d8df5229 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.SqlScriptingContextManager
+import org.apache.spark.sql.catalyst.catalog.{SqlScriptingContextManager => 
SqlScriptingContextManagerTrait}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
VariableReference}
 import org.apache.spark.sql.catalyst.plans.logical.{Command, CompoundBody, 
LogicalPlan, SetVariable}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -96,10 +97,10 @@ case class ResolveExecuteImmediate(sparkSession: 
SparkSession, catalogManager: C
       stopIndex = Some(sqlString.length - 1)
     )
 
-    // Execute the query recursively with isolated local variable context and 
EXECUTE IMMEDIATE
-    // origin. The isolation must cover parsing, analysis, and execution 
phases.
-    // CurrentOrigin.withOrigin ensures expressions created during parsing get 
the proper context
-    val result = withIsolatedLocalVariableContext {
+    // Execute the query with local variables hidden and EXECUTE IMMEDIATE 
origin set.
+    // Both must cover parsing, analysis, and execution phases.
+    // CurrentOrigin.withOrigin ensures expressions created during parsing get 
the proper context.
+    val result = withHiddenLocalVariables {
       CurrentOrigin.withOrigin(executeImmediateOrigin) {
         // Use shared parameterized query execution logic (same as OPEN CURSOR)
         val df = if (args.isEmpty) {
@@ -121,9 +122,9 @@ case class ResolveExecuteImmediate(sparkSession: 
SparkSession, catalogManager: C
           throw QueryCompilationErrors.sqlScriptInExecuteImmediate(sqlString)
         }
 
-        // Force analysis to happen within the isolated context to ensure 
local variables
-        // are not accessible. This is critical because DataFrames are lazy 
and analysis
-        // would otherwise happen outside the isolation context.
+        // Force analysis to happen while local variables are hidden. This is 
critical  because
+        // DataFrames are lazy and analysis would otherwise happen after 
withHiddenLocalVariables
+        // has restored the original context.
         df.queryExecution.analyzed
         df
       }
@@ -185,14 +186,32 @@ case class ResolveExecuteImmediate(sparkSession: 
SparkSession, catalogManager: C
   }
 
   /**
-   * Temporarily isolates the SQL scripting context during EXECUTE IMMEDIATE 
execution.
-   * This makes withinSqlScript() return false, ensuring that statements 
within EXECUTE IMMEDIATE
-   * are not affected by the outer SQL script context (e.g., local variables, 
script-specific
-   * errors).
+   * Temporarily hides the local variable context while executing the `EXECUTE 
IMMEDIATE` command.
+   * This is expected behavior, as local variables cannot be resolved within 
the body of this
+   * command. This does not apply to session variables. The rest of the SQL 
scripting context is
+   * preserved.
+   *
+   * {{{
+   *   DECLARE VARIABLE v1 = 1; -- Session variable.
+   *   BEGIN
+   *     DECLARE v2 = 2; -- Local variable.
+   *     EXECUTE IMMEDIATE 'SELECT v1'; -- Should work.
+   *     EXECUTE IMMEDIATE 'SELECT v2'; -- Should fail.
+   *     EXECUTE IMMEDIATE 'SELECT ?' USING v2; -- Should work.
+   *   END
+   * }}}
    */
-  private def withIsolatedLocalVariableContext[A](f: => A): A = {
-    // Completely clear the SQL scripting context to make withinSqlScript() 
return false
-    val handle = SqlScriptingContextManager.create(null)
-    handle.runWith(f)
+  private def withHiddenLocalVariables[A](f: => A): A = {
+    val newContextManager = SqlScriptingContextManager.get() match {
+      case Some(contextManager) =>
+        // SqlScriptingContextManagerTrait is 
catalog.SqlScriptingContextManager, renamed on import
+        // to distinguish from the object of the same name in 
org.apache.spark.sql.catalyst.
+        new SqlScriptingContextManagerTrait {
+          override def getContext = contextManager.getContext
+          override def getVariableManager = None
+        }
+      case None => null
+    }
+    SqlScriptingContextManager.create(newContextManager).runWith(f)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
index e625f02b8bbe..0f2777db2926 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
@@ -39,7 +39,7 @@ case class CreateVariableExec(
     replace: Boolean) extends LeafV2CommandExec with ExpressionsEvaluator {
 
   override protected def run(): Seq[InternalRow] = {
-    val scriptingVariableManager = 
SqlScriptingContextManager.get().map(_.getVariableManager)
+    val scriptingVariableManager = 
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
     val tempVariableManager = 
session.sessionState.catalogManager.tempVariableManager
 
     val exprs = prepareExpressions(Seq(defaultExpr.child), 
subExprEliminationEnabled = false)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
index b2c42d64c0a1..ad867e653767 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
@@ -180,7 +180,7 @@ case class FetchCursorExec(
     // Select the appropriate variable manager based on the catalog
     // This logic matches SetVariableExec.setVariable()
     val tempVariableManager = 
session.sessionState.catalogManager.tempVariableManager
-    val scriptingVariableManager = 
SqlScriptingContextManager.get().map(_.getVariableManager)
+    val scriptingVariableManager = 
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
 
     val variableManager = varRef.catalog match {
       case FakeLocalCatalog if scriptingVariableManager.isEmpty =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
index a5b64736a618..ef8e238832b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
@@ -66,7 +66,7 @@ case class SetVariableExec(variables: Seq[VariableReference], 
query: SparkPlan)
     }
 
     val tempVariableManager = 
session.sessionState.catalogManager.tempVariableManager
-    val scriptingVariableManager = 
SqlScriptingContextManager.get().map(_.getVariableManager)
+    val scriptingVariableManager = 
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
 
     val variableManager = variable.catalog match {
       case FakeLocalCatalog if scriptingVariableManager.isEmpty =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
index 98c597877e32..3a4d55169d90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
@@ -54,7 +54,7 @@ object VariableAssignmentUtils {
       varRef.originalNameParts.map(_.toLowerCase(Locale.ROOT))
     }
 
-    val scriptingVariableManager = 
SqlScriptingContextManager.get().map(_.getVariableManager)
+    val scriptingVariableManager = 
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
 
     val variableManager = varRef.catalog match {
       case FakeLocalCatalog if scriptingVariableManager.isEmpty =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
index 884269d85f99..a4dbfca1d31a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
@@ -30,5 +30,5 @@ class SqlScriptingContextManagerImpl(context: 
SqlScriptingExecutionContext)
 
   override def getContext: SqlScriptingExecutionContextExtension = context
 
-  override def getVariableManager: VariableManager = variableManager
+  override def getVariableManager: Option[VariableManager] = 
Some(variableManager)
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
index c252047b3abe..a823c1bab5f3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.test.SharedSparkSession
 
 class ExecuteImmediateEndToEndSuite extends QueryTest with SharedSparkSession {
@@ -46,4 +46,101 @@ class ExecuteImmediateEndToEndSuite extends QueryTest with 
SharedSparkSession {
       condition = "SQL_SCRIPT_IN_EXECUTE_IMMEDIATE",
       parameters = Map("sqlString" -> "BEGIN SELECT 1; END"))
   }
+
+  test("EXECUTE IMMEDIATE resolves session variables in body") {
+    withSessionVariable("v1", "v2") {
+      spark.sql("DECLARE v1 = 42")
+      spark.sql("DECLARE v2 = 99")
+      checkAnswer(spark.sql("EXECUTE IMMEDIATE 'SELECT system.session.v1, 
v2'"), Row(42, 99))
+    }
+  }
+
+  test("EXECUTE IMMEDIATE resolves session variables inside script") {
+    withSessionVariable("v1", "v2") {
+      spark.sql("DECLARE v1 = 10")
+      spark.sql("DECLARE v2 = 20")
+      val result = spark.sql(
+        """
+          |BEGIN
+          |  DECLARE v3 = 1;
+          |  EXECUTE IMMEDIATE 'SELECT system.session.v1, v2';
+          |END
+          |""".stripMargin)
+      checkAnswer(result, Row(10, 20))
+    }
+  }
+
+  test("EXECUTE IMMEDIATE does not resolve local variables") {
+    val result = intercept[AnalysisException] {
+      spark.sql(
+        """
+          |BEGIN
+          |  DECLARE v1 = 5;
+          |  EXECUTE IMMEDIATE 'SELECT v1';
+          |END
+          |""".stripMargin)
+    }
+    checkError(
+      exception = result,
+      condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+      sqlState = "42703",
+      parameters = Map("objectName" -> "`v1`"),
+      context = ExpectedContext(
+        objectType = "EXECUTE IMMEDIATE",
+        objectName = "",
+        startIndex = 7,
+        stopIndex = 8,
+        fragment = "v1"))
+  }
+
+  test("EXECUTE IMMEDIATE resolves local variable in USING clause") {
+    val result = spark.sql(
+      """
+        |BEGIN
+        |  DECLARE v1 = 5;
+        |  EXECUTE IMMEDIATE 'SELECT ?' USING v1;
+        |END
+        |""".stripMargin)
+    checkAnswer(result, Row(5))
+  }
+
+  test("EXECUTE IMMEDIATE resolves session var in body and local var in 
USING") {
+    withSessionVariable("v1") {
+      spark.sql("DECLARE v1 = 10")
+      val result = spark.sql(
+        """
+          |BEGIN
+          |  DECLARE v2 = 20;
+          |  EXECUTE IMMEDIATE 'SELECT system.session.v1, ?' USING v2;
+          |END
+          |""".stripMargin)
+      checkAnswer(result, Row(10, 20))
+    }
+  }
+
+  test("EXECUTE IMMEDIATE fails when local var referenced in body alongside 
session var") {
+    withSessionVariable("v1") {
+      spark.sql("DECLARE v1 = 10")
+      val e = intercept[AnalysisException] {
+        spark.sql(
+          """
+            |BEGIN
+            |  DECLARE v2 = 20;
+            |  EXECUTE IMMEDIATE 'SELECT v1, v2';
+            |END
+            |""".stripMargin)
+      }
+      checkError(
+        exception = e,
+        condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+        sqlState = "42703",
+        parameters = Map("objectName" -> "`v2`"),
+        context = ExpectedContext(
+          objectType = "EXECUTE IMMEDIATE",
+          objectName = "",
+          startIndex = 11,
+          stopIndex = 12,
+          fragment = "v2"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to