This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fa872498ed6e [SPARK-55891][SQL] Preserve the SQL scripting context
inside EXECUTE IMMEDIATE
fa872498ed6e is described below
commit fa872498ed6e91ded378436ba233506839134fc7
Author: ilicmarkodb <[email protected]>
AuthorDate: Wed Mar 11 22:55:53 2026 +0800
[SPARK-55891][SQL] Preserve the SQL scripting context inside EXECUTE
IMMEDIATE
### What changes were proposed in this pull request?
In this PR, I propose preserving the SQL Scripting context inside `EXECUTE
IMMEDIATE`, instead of setting it to `null` while executing the command. I
propose persisting everything except the variable manager, since local
variables should not be resolvable inside the `EXECUTE IMMEDIATE` body.
### Why are the changes needed?
Improvement.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests.
### Was this patch authored or co-authored using generative AI tooling?
Yes, Claude.
Closes #54691 from ilicmarkodb/execute_imm_context.
Authored-by: ilicmarkodb <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/ResolveCatalogs.scala | 18 ++--
.../sql/catalyst/analysis/VariableResolution.scala | 2 +-
.../catalyst/analysis/resolver/ResolverGuard.scala | 2 +-
.../catalog/SqlScriptingContextManager.scala | 4 +-
.../analysis/ResolveExecuteImmediate.scala | 49 +++++++----
.../execution/command/v2/CreateVariableExec.scala | 2 +-
.../sql/execution/command/v2/FetchCursorExec.scala | 2 +-
.../sql/execution/command/v2/SetVariableExec.scala | 2 +-
.../command/v2/VariableAssignmentUtils.scala | 2 +-
.../scripting/SqlScriptingContextManagerImpl.scala | 2 +-
.../execution/ExecuteImmediateEndToEndSuite.scala | 99 +++++++++++++++++++++-
11 files changed, 152 insertions(+), 32 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 5fa8ffefc012..9459a043b095 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -45,9 +45,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
// We resolve only UnresolvedIdentifiers, and pass on the other nodes
val resolved = identifiers.map {
case UnresolvedIdentifier(nameParts, _) =>
- // From scripts we can only create local variables, which must be
unqualified,
- // and must not be DECLARE OR REPLACE.
- if (withinSqlScript) {
+ if (withinLocalVariableScope) {
if (c.replace) {
throw new AnalysisException(
"INVALID_VARIABLE_DECLARATION.REPLACE_LOCAL_VARIABLE",
@@ -61,7 +59,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
Map("varName" -> toSQLId(nameParts)))
}
- SqlScriptingContextManager.get().map(_.getVariableManager)
+ SqlScriptingContextManager.get().flatMap(_.getVariableManager)
.getOrElse(throw SparkException.internalError(
"Scripting local variable manager should be present in SQL
script."))
.qualify(nameParts.last)
@@ -77,7 +75,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.copy(names = resolved)
case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) =>
- if (withinSqlScript) {
+ if (withinLocalVariableScope) {
throw new AnalysisException(
"UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE",
Map.empty)
}
@@ -203,8 +201,14 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
}
}
- private def withinSqlScript: Boolean =
- SqlScriptingContextManager.get().map(_.getVariableManager).isDefined
+ /**
+ * Whether we are within a local variable scope. This is true when we are
directly inside a
+ * SQL script and local variable rules apply (unqualified DECLARE only, no
OR REPLACE, no DROP).
+ * EXECUTE IMMEDIATE inside a script is not within a local variable scope,
since it works
+ * with session variables as if it were outside the script.
+ */
+ private def withinLocalVariableScope: Boolean =
+ SqlScriptingContextManager.get().flatMap(_.getVariableManager).isDefined
private def assertValidSessionVariableNameParts(
nameParts: Seq[String],
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
index 2a1f35b0859d..0095885c0135 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
@@ -108,7 +108,7 @@ class VariableResolution(tempVariableManager:
TempVariableManager) extends SQLCo
SqlScriptingContextManager
.get()
- .map(_.getVariableManager)
+ .flatMap(_.getVariableManager)
// If variable name is qualified with session.<varName> treat it as a
session variable.
.filterNot(
_ =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
index 4639e0bb3dcd..e0d72a72a65e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ResolverGuard.scala
@@ -519,7 +519,7 @@ class ResolverGuard(catalogManager: CatalogManager) extends
SQLConfHelper {
catalogManager.tempVariableManager.isEmpty
private def checkScriptingVariables() =
-
SqlScriptingContextManager.get().map(_.getVariableManager).forall(_.isEmpty)
+
SqlScriptingContextManager.get().flatMap(_.getVariableManager).forall(_.isEmpty)
private def tryThrowUnsupportedSinglePassAnalyzerFeature(operator:
LogicalPlan): Unit = {
tryThrowUnsupportedSinglePassAnalyzerFeature(s"${operator.getClass}
operator resolution")
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
index dcf38558afb7..196591b6d982 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SqlScriptingContextManager.scala
@@ -27,7 +27,7 @@ trait SqlScriptingContextManager {
def getContext: SqlScriptingExecutionContextExtension
/**
- * Get variable manager
+ * Get the variable manager, if available.
*/
- def getVariableManager: VariableManager
+ def getVariableManager: Option[VariableManager]
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
index 03c04482893b..bf57d8df5229 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SqlScriptingContextManager
+import org.apache.spark.sql.catalyst.catalog.{SqlScriptingContextManager =>
SqlScriptingContextManagerTrait}
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression,
VariableReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, CompoundBody,
LogicalPlan, SetVariable}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -96,10 +97,10 @@ case class ResolveExecuteImmediate(sparkSession:
SparkSession, catalogManager: C
stopIndex = Some(sqlString.length - 1)
)
- // Execute the query recursively with isolated local variable context and
EXECUTE IMMEDIATE
- // origin. The isolation must cover parsing, analysis, and execution
phases.
- // CurrentOrigin.withOrigin ensures expressions created during parsing get
the proper context
- val result = withIsolatedLocalVariableContext {
+ // Execute the query with local variables hidden and EXECUTE IMMEDIATE
origin set.
+ // Both must cover parsing, analysis, and execution phases.
+ // CurrentOrigin.withOrigin ensures expressions created during parsing get
the proper context.
+ val result = withHiddenLocalVariables {
CurrentOrigin.withOrigin(executeImmediateOrigin) {
// Use shared parameterized query execution logic (same as OPEN CURSOR)
val df = if (args.isEmpty) {
@@ -121,9 +122,9 @@ case class ResolveExecuteImmediate(sparkSession:
SparkSession, catalogManager: C
throw QueryCompilationErrors.sqlScriptInExecuteImmediate(sqlString)
}
- // Force analysis to happen within the isolated context to ensure
local variables
- // are not accessible. This is critical because DataFrames are lazy
and analysis
- // would otherwise happen outside the isolation context.
+ // Force analysis to happen while local variables are hidden. This is
critical because
+ // DataFrames are lazy and analysis would otherwise happen after
withHiddenLocalVariables
+ // has restored the original context.
df.queryExecution.analyzed
df
}
@@ -185,14 +186,32 @@ case class ResolveExecuteImmediate(sparkSession:
SparkSession, catalogManager: C
}
/**
- * Temporarily isolates the SQL scripting context during EXECUTE IMMEDIATE
execution.
- * This makes withinSqlScript() return false, ensuring that statements
within EXECUTE IMMEDIATE
- * are not affected by the outer SQL script context (e.g., local variables,
script-specific
- * errors).
+ * Temporarily hides the local variable context while executing the `EXECUTE
IMMEDIATE` command.
+ * This is expected behavior, as local variables cannot be resolved within
the body of this
+ * command. This does not apply to session variables. The rest of the SQL
scripting context is
+ * preserved.
+ *
+ * {{{
+ * DECLARE VARIABLE v1 = 1; -- Session variable.
+ * BEGIN
+ * DECLARE v2 = 2; -- Local variable.
+ * EXECUTE IMMEDIATE 'SELECT v1'; -- Should work.
+ * EXECUTE IMMEDIATE 'SELECT v2'; -- Should fail.
+ * EXECUTE IMMEDIATE 'SELECT ?' USING v2; -- Should work.
+ * END
+ * }}}
*/
- private def withIsolatedLocalVariableContext[A](f: => A): A = {
- // Completely clear the SQL scripting context to make withinSqlScript()
return false
- val handle = SqlScriptingContextManager.create(null)
- handle.runWith(f)
+ private def withHiddenLocalVariables[A](f: => A): A = {
+ val newContextManager = SqlScriptingContextManager.get() match {
+ case Some(contextManager) =>
+ // SqlScriptingContextManagerTrait is
catalog.SqlScriptingContextManager, renamed on import
+ // to distinguish from the object of the same name in
org.apache.spark.sql.catalyst.
+ new SqlScriptingContextManagerTrait {
+ override def getContext = contextManager.getContext
+ override def getVariableManager = None
+ }
+ case None => null
+ }
+ SqlScriptingContextManager.create(newContextManager).runWith(f)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
index e625f02b8bbe..0f2777db2926 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariableExec.scala
@@ -39,7 +39,7 @@ case class CreateVariableExec(
replace: Boolean) extends LeafV2CommandExec with ExpressionsEvaluator {
override protected def run(): Seq[InternalRow] = {
- val scriptingVariableManager =
SqlScriptingContextManager.get().map(_.getVariableManager)
+ val scriptingVariableManager =
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
val tempVariableManager =
session.sessionState.catalogManager.tempVariableManager
val exprs = prepareExpressions(Seq(defaultExpr.child),
subExprEliminationEnabled = false)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
index b2c42d64c0a1..ad867e653767 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
@@ -180,7 +180,7 @@ case class FetchCursorExec(
// Select the appropriate variable manager based on the catalog
// This logic matches SetVariableExec.setVariable()
val tempVariableManager =
session.sessionState.catalogManager.tempVariableManager
- val scriptingVariableManager =
SqlScriptingContextManager.get().map(_.getVariableManager)
+ val scriptingVariableManager =
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
val variableManager = varRef.catalog match {
case FakeLocalCatalog if scriptingVariableManager.isEmpty =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
index a5b64736a618..ef8e238832b3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
@@ -66,7 +66,7 @@ case class SetVariableExec(variables: Seq[VariableReference],
query: SparkPlan)
}
val tempVariableManager =
session.sessionState.catalogManager.tempVariableManager
- val scriptingVariableManager =
SqlScriptingContextManager.get().map(_.getVariableManager)
+ val scriptingVariableManager =
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
val variableManager = variable.catalog match {
case FakeLocalCatalog if scriptingVariableManager.isEmpty =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
index 98c597877e32..3a4d55169d90 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
@@ -54,7 +54,7 @@ object VariableAssignmentUtils {
varRef.originalNameParts.map(_.toLowerCase(Locale.ROOT))
}
- val scriptingVariableManager =
SqlScriptingContextManager.get().map(_.getVariableManager)
+ val scriptingVariableManager =
SqlScriptingContextManager.get().flatMap(_.getVariableManager)
val variableManager = varRef.catalog match {
case FakeLocalCatalog if scriptingVariableManager.isEmpty =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
index 884269d85f99..a4dbfca1d31a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingContextManagerImpl.scala
@@ -30,5 +30,5 @@ class SqlScriptingContextManagerImpl(context:
SqlScriptingExecutionContext)
override def getContext: SqlScriptingExecutionContextExtension = context
- override def getVariableManager: VariableManager = variableManager
+ override def getVariableManager: Option[VariableManager] =
Some(variableManager)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
index c252047b3abe..a823c1bab5f3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExecuteImmediateEndToEndSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
class ExecuteImmediateEndToEndSuite extends QueryTest with SharedSparkSession {
@@ -46,4 +46,101 @@ class ExecuteImmediateEndToEndSuite extends QueryTest with
SharedSparkSession {
condition = "SQL_SCRIPT_IN_EXECUTE_IMMEDIATE",
parameters = Map("sqlString" -> "BEGIN SELECT 1; END"))
}
+
+ test("EXECUTE IMMEDIATE resolves session variables in body") {
+ withSessionVariable("v1", "v2") {
+ spark.sql("DECLARE v1 = 42")
+ spark.sql("DECLARE v2 = 99")
+ checkAnswer(spark.sql("EXECUTE IMMEDIATE 'SELECT system.session.v1,
v2'"), Row(42, 99))
+ }
+ }
+
+ test("EXECUTE IMMEDIATE resolves session variables inside script") {
+ withSessionVariable("v1", "v2") {
+ spark.sql("DECLARE v1 = 10")
+ spark.sql("DECLARE v2 = 20")
+ val result = spark.sql(
+ """
+ |BEGIN
+ | DECLARE v3 = 1;
+ | EXECUTE IMMEDIATE 'SELECT system.session.v1, v2';
+ |END
+ |""".stripMargin)
+ checkAnswer(result, Row(10, 20))
+ }
+ }
+
+ test("EXECUTE IMMEDIATE does not resolve local variables") {
+ val result = intercept[AnalysisException] {
+ spark.sql(
+ """
+ |BEGIN
+ | DECLARE v1 = 5;
+ | EXECUTE IMMEDIATE 'SELECT v1';
+ |END
+ |""".stripMargin)
+ }
+ checkError(
+ exception = result,
+ condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+ sqlState = "42703",
+ parameters = Map("objectName" -> "`v1`"),
+ context = ExpectedContext(
+ objectType = "EXECUTE IMMEDIATE",
+ objectName = "",
+ startIndex = 7,
+ stopIndex = 8,
+ fragment = "v1"))
+ }
+
+ test("EXECUTE IMMEDIATE resolves local variable in USING clause") {
+ val result = spark.sql(
+ """
+ |BEGIN
+ | DECLARE v1 = 5;
+ | EXECUTE IMMEDIATE 'SELECT ?' USING v1;
+ |END
+ |""".stripMargin)
+ checkAnswer(result, Row(5))
+ }
+
+ test("EXECUTE IMMEDIATE resolves session var in body and local var in
USING") {
+ withSessionVariable("v1") {
+ spark.sql("DECLARE v1 = 10")
+ val result = spark.sql(
+ """
+ |BEGIN
+ | DECLARE v2 = 20;
+ | EXECUTE IMMEDIATE 'SELECT system.session.v1, ?' USING v2;
+ |END
+ |""".stripMargin)
+ checkAnswer(result, Row(10, 20))
+ }
+ }
+
+ test("EXECUTE IMMEDIATE fails when local var referenced in body alongside
session var") {
+ withSessionVariable("v1") {
+ spark.sql("DECLARE v1 = 10")
+ val e = intercept[AnalysisException] {
+ spark.sql(
+ """
+ |BEGIN
+ | DECLARE v2 = 20;
+ | EXECUTE IMMEDIATE 'SELECT v1, v2';
+ |END
+ |""".stripMargin)
+ }
+ checkError(
+ exception = e,
+ condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+ sqlState = "42703",
+ parameters = Map("objectName" -> "`v2`"),
+ context = ExpectedContext(
+ objectType = "EXECUTE IMMEDIATE",
+ objectName = "",
+ startIndex = 11,
+ stopIndex = 12,
+ fragment = "v2"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]