This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f92601ce879b [SPARK-53444][SQL] Rework execute immediate
f92601ce879b is described below

commit f92601ce879b1f2220387d2f720b533d5ff08e21
Author: Serge Rielau <se...@rielau.com>
AuthorDate: Sat Sep 13 21:32:48 2025 +0800

    [SPARK-53444][SQL] Rework execute immediate
    
    ### What changes were proposed in this pull request?
    
    This PR significantly reworks the existing `EXECUTE IMMEDIATE` SQL 
statement implementation in to improve separation of concerns, robustness, and 
functionality:
    
    1. **Architecture Redesign**:
       - Refactored from direct execution to a nested invocation model using 
`sparkSession.sql()`
       - Improved separation between analysis-time validation and 
execution-time processing
       - Enhanced isolation between outer and inner query contexts
    
    2. **Enhanced Query String Support**:
       - **NEW**: Support for constant expressions as SQL statement arguments 
(previously only string literals)
    
    3. **Improved Parameter Handling**:
       - Redesigned parameter processing with unified parameter arrays
       - Enhanced validation for mixing positional and named parameters
       - Better error messages with `UNSUPPORTED_EXPR_FOR_PARAMETER` for 
invalid expressions
    
    4. **Robust SQL Scripting Integration**:
       - **NEW**: Complete isolation of local variables from nested `EXECUTE 
IMMEDIATE` calls
       - Preserved session variable functionality within `EXECUTE IMMEDIATE`
       - **NEW**: Proper command vs. query result distinction (commands don't 
produce result sets)
    
    5. **Enhanced Error Handling**:
       - Removed unnecessary exception wrapping for cleaner error propagation
       - **NEW**: Multi-level context error reporting showing both outer and 
inner query context
       - Comprehensive validation with appropriate error classes
    
    6. **Code Quality Improvements**:
       - Eliminated code duplication in parameter validation logic
       - Consolidated imports and cleaned up unused code
       - Improved method naming and documentation
    
    Open issues:
    The code testing whether a query uses named or positional parameter markers 
is not robust yet and needs more work.
    
    ### Why are the changes needed?
    
    The existing implementation has been brittle and could not handle 
transitive closure (e.g. nested EXECUTE IMMEDIATE and SQL Scripting in EXECUTE 
IMMEDIATE.
    There is also a follow on PR dealing with improved parameter substitution 
which is being blocked from supporting EXECUTE IMMEDIATE until this PR is 
delivered.
    Lastly with this change we lay the foundation to support nesting/chaining 
of errors contexts to SQL level stacks.
    
    ### Does this PR introduce _any_ user-facing change?
    
    **Enhanced Functionality (Backward Compatible):**
    
    ```sql
    -- NEW: Expression concatenation
    EXECUTE IMMEDIATE 'SELECT * FROM ' || table_name || ' WHERE active = true';
    
    -- EXISTING: All previous syntax continues to work
    EXECUTE IMMEDIATE 'SELECT * FROM table_name';
    EXECUTE IMMEDIATE 'SELECT * FROM table_name WHERE id = ?' USING 123;
    EXECUTE IMMEDIATE 'SELECT name FROM users WHERE id = ?' INTO user_name 
USING 123;
    ```
    
    A couple of error conditions now return more general codes due to improved 
orthogonality.
    
    All existing valid `EXECUTE IMMEDIATE` statements continue to work as 
before.
    
    ### How was this patch tested?
    
    1. **Regression Testing**:
       - All existing `EXECUTE IMMEDIATE` tests continue to pass
       - Verified backward compatibility with existing syntax and behavior
    
    2. **Enhanced Test Coverage**:
       - **NEW**: Tests for variable references and expression concatenation in 
query strings
       - **NEW**: Tests for proper local variable isolation in SQL scripting 
contexts
       - **NEW**: Tests for command vs. query result handling
       - **EXPANDED**: Error condition testing with new validation logic
    
    3. **Integration Testing**:
       - `SqlScriptingExecutionSuite` tests for SQL scripting integration
       - Golden file tests (`execute-immediate.sql`) for analyzer and execution 
results
       - Cross-feature testing with variables, parameters, and nested contexts
    
    4. **Error Handling Validation**:
       - Comprehensive testing of all error classes and messages
       - Validation that exceptions propagate naturally without wrapping
       - Multi-level error context testing
    
    The reworked implementation passes all existing tests plus extensive new 
test coverage, ensuring both backward compatibility and enhanced functionality.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude 3.5 Sonnet (Anthropic) - Used for code review, 
refactoring suggestions, and implementation guidance. All core architectural 
decisions, SQL semantics, and implementation logic were designed and developed 
by human developers.
    
    Closes #52173 from srielau/rework-execute-immediate.
    
    Lead-authored-by: Serge Rielau <se...@rielau.com>
    Co-authored-by: Serge Rielau <srie...@users.noreply.github.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../src/main/resources/error/error-conditions.json |  12 +-
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  15 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  18 --
 .../catalyst/analysis/ColumnResolutionHelper.scala |   1 -
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |   3 +-
 .../sql/catalyst/analysis/ResolveSetVariable.scala |  36 ++-
 .../sql/catalyst/analysis/VariableResolution.scala |  22 +-
 .../sql/catalyst/analysis/executeImmediate.scala   | 218 ----------------
 .../spark/sql/catalyst/analysis/parameters.scala   |  87 ++++++-
 .../spark/sql/catalyst/analysis/unresolved.scala   |  18 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  13 +-
 .../sql/catalyst/rules/RuleIdCollection.scala      |   2 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  12 +-
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |  78 ------
 .../sql/catalyst/analysis/AnalysisSuite.scala      |  32 +--
 .../analysis/ResolveExecuteImmediate.scala         | 208 +++++++++++++++
 .../apache/spark/sql/classic/SparkSession.scala    |  43 ++-
 .../spark/sql/execution/command/SetCommand.scala   |   5 +-
 .../sql/internal/BaseSessionStateBuilder.scala     |   3 +-
 .../analyzer-results/execute-immediate.sql.out     | 289 +++++++++++++++++----
 .../parse-query-correctness-old-behavior.sql.out   | 150 ++++++++---
 .../sql-tests/inputs/execute-immediate.sql         |  43 ++-
 .../sql-tests/results/execute-immediate.sql.out    | 239 ++++++++++++++---
 .../parse-query-correctness-old-behavior.sql.out   |  80 +++++-
 .../spark/sql/errors/QueryParsingErrorsSuite.scala |  16 +-
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   3 +-
 26 files changed, 1086 insertions(+), 560 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 8bbf24074f35..06e6c2f14d86 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2742,6 +2742,12 @@
     ],
     "sqlState" : "42001"
   },
+  "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : {
+    "message" : [
+      "Expression type must be string type but got <exprType>."
+    ],
+    "sqlState" : "42K09"
+  },
   "INVALID_EXTERNAL_TYPE" : {
     "message" : [
       "The external type <externalType> is not valid for the type <type> at 
the expression <expr>."
@@ -3919,12 +3925,6 @@
     },
     "sqlState" : "42K0M"
   },
-  "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : {
-    "message" : [
-      "Variable type must be string type but got <varType>."
-    ],
-    "sqlState" : "42K09"
-  },
   "INVALID_VARIANT_CAST" : {
     "message" : [
       "The variant value `<value>` cannot be cast into `<dataType>`. Please 
use `try_variant_get` instead."
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index e63a229a3207..8efab99d4ec8 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -397,7 +397,7 @@ setResetStatement
     ;
 
 executeImmediate
-    : EXECUTE IMMEDIATE queryParam=executeImmediateQueryParam (INTO 
targetVariable=multipartIdentifierList)? executeImmediateUsing?
+    : EXECUTE IMMEDIATE queryParam=expression (INTO 
targetVariable=multipartIdentifierList)? executeImmediateUsing?
     ;
 
 executeImmediateUsing
@@ -405,19 +405,6 @@ executeImmediateUsing
     | USING params=namedExpressionSeq
     ;
 
-executeImmediateQueryParam
-    : stringLit
-    | multipartIdentifier
-    ;
-
-executeImmediateArgument
-    : (constant|multipartIdentifier) (AS name=errorCapturingIdentifier)?
-    ;
-
-executeImmediateArgumentSeq
-    : executeImmediateArgument (COMMA executeImmediateArgument)*
-    ;
-
 timezone
     : stringLit
     | LOCAL
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4a360fa1da7e..3cef19cbad53 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -135,9 +135,6 @@ object FakeV2SessionCatalog extends TableCatalog with 
FunctionCatalog with Suppo
  *                              if `t` was a permanent table when the current 
view was created, it
  *                              should still be a permanent table when 
resolving the current view,
  *                              even if a temp view `t` has been created.
- * @param isExecuteImmediate Whether the current plan is created by EXECUTE 
IMMEDIATE. Used when
- *                           resolving variables, as SQL Scripting local 
variables should not be
- *                           visible from EXECUTE IMMEDIATE.
  * @param outerPlan The query plan from the outer query that can be used to 
resolve star
  *                  expressions in a subquery.
  */
@@ -155,7 +152,6 @@ case class AnalysisContext(
     referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
     referredTempVariableNames: Seq[Seq[String]] = Seq.empty,
     outerPlan: Option[LogicalPlan] = None,
-    isExecuteImmediate: Boolean = false,
     collation: Option[String] = None,
 
     /**
@@ -212,19 +208,11 @@ object AnalysisContext {
       viewDesc.viewReferredTempViewNames,
       mutable.Set(viewDesc.viewReferredTempFunctionNames: _*),
       viewDesc.viewReferredTempVariableNames,
-      isExecuteImmediate = originContext.isExecuteImmediate,
       collation = viewDesc.collation)
     set(context)
     try f finally { set(originContext) }
   }
 
-  def withExecuteImmediateContext[A](f: => A): A = {
-    val originContext = value.get()
-    val context = originContext.copy(isExecuteImmediate = true)
-
-    set(context)
-    try f finally { set(originContext) }
-  }
 
   def withNewAnalysisContext[A](f: => A): A = {
     val originContext = value.get()
@@ -495,10 +483,6 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       RewriteMergeIntoTable ::
       MoveParameterizedQueriesDown ::
       BindParameters ::
-      new SubstituteExecuteImmediate(
-        catalogManager,
-        resolveChild = executeSameContext,
-        checkAnalysis = checkAnalysis) ::
       typeCoercionRules() ++
       Seq(
         ResolveWithCTE,
@@ -1802,8 +1786,6 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
         resolveReferencesInSort(s)
 
-      // Pass for Execute Immediate as arguments will be resolved by 
[[SubstituteExecuteImmediate]].
-      case e : ExecuteImmediateQuery => e
 
       case d: DataFrameDropColumns if !d.resolved =>
         resolveDataFrameDropColumns(d)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index bade5f0bee9d..3224ccafafec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -241,7 +241,6 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
       variableResolution.resolveMultipartName(
         nameParts = nameParts,
         resolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty,
-        resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate,
         referredTempVariableNames = 
AnalysisContext.get.referredTempVariableNames
       ).map(e => Alias(e, nameParts.last)())
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 851db598c4e3..6307ccd5b975 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -142,8 +142,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
   }
 
   private def withinSqlScript: Boolean =
-    SqlScriptingContextManager.get().map(_.getVariableManager).isDefined &&
-      !AnalysisContext.get.isExecuteImmediate
+    SqlScriptingContextManager.get().map(_.getVariableManager).isDefined
 
   private def assertValidSessionVariableNameParts(
       nameParts: Seq[String],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
index ab4408435767..4b16448641bc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
@@ -35,6 +35,22 @@ class ResolveSetVariable(val catalogManager: CatalogManager) 
extends Rule[Logica
   with ColumnResolutionHelper {
   private val variableResolution = new 
VariableResolution(catalogManager.tempVariableManager)
 
+  /**
+   * Checks for duplicate variable names and throws an exception if found.
+   * Names are normalized when the variables are created.
+   * No need for case insensitive comparison here.
+   */
+  private def checkForDuplicateVariables(variables: Seq[VariableReference]): 
Unit = {
+    // TODO: we need to group by the qualified variable name once other 
catalogs support it.
+    val dups = variables.groupBy(_.identifier).filter(kv => kv._2.length > 1)
+    if (dups.nonEmpty) {
+      throw new AnalysisException(
+        errorClass = "DUPLICATE_ASSIGNMENTS",
+        messageParameters = Map("nameList" ->
+          dups.keys.map(key => toSQLId(key.name())).mkString(", ")))
+    }
+  }
+
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
     _.containsPattern(COMMAND), ruleId) {
     // Resolve the left hand side of the SET VAR command
@@ -42,8 +58,7 @@ class ResolveSetVariable(val catalogManager: CatalogManager) 
extends Rule[Logica
       val resolvedVars = setVariable.targetVariables.map {
         case u: UnresolvedAttribute =>
           variableResolution.lookupVariable(
-            nameParts = u.nameParts,
-            resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate
+            nameParts = u.nameParts
           ) match {
             case Some(variable) => variable.copy(canFold = false)
             case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", 
"SESSION"))
@@ -53,24 +68,17 @@ class ResolveSetVariable(val catalogManager: 
CatalogManager) extends Rule[Logica
           "Unexpected target variable expression in SetVariable: " + other)
       }
 
-      // Protect against duplicate variable names
-      // Names are normalized when the variables are created.
-      // No need for case insensitive comparison here.
-      // TODO: we need to group by the qualified variable name once other 
catalogs support it.
-      val dups = resolvedVars.groupBy(_.identifier).filter(kv => kv._2.length 
> 1)
-      if (dups.nonEmpty) {
-        throw new AnalysisException(
-          errorClass = "DUPLICATE_ASSIGNMENTS",
-          messageParameters = Map("nameList" ->
-            dups.keys.map(key => toSQLId(key.name())).mkString(", ")))
-      }
-
       setVariable.copy(targetVariables = resolvedVars)
 
     case setVariable: SetVariable
         if 
setVariable.targetVariables.forall(_.isInstanceOf[VariableReference]) &&
           setVariable.sourceQuery.resolved =>
       val targetVariables = 
setVariable.targetVariables.map(_.asInstanceOf[VariableReference])
+
+      // Check for duplicate variable names - this handles both regular SET 
VAR (after resolution)
+      // and EXECUTE IMMEDIATE ... INTO (which comes pre-resolved)
+      checkForDuplicateVariables(targetVariables)
+
       val withCasts = TableOutputResolver.resolveVariableOutputColumns(
         targetVariables, setVariable.sourceQuery, conf)
       val withLimit = if (withCasts.maxRows.exists(_ <= 2)) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
index 72af7c619a08..2a1f35b0859d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
@@ -52,9 +52,6 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
    *   (e.g., ["catalog", "schema", "variable", "field1", "field2"])
    * @param resolvingView Whether this resolution is happening within a view 
context.
    *   When true, only variables explicitly referred to in the view definition 
are accessible.
-   * @param resolvingExecuteImmediate Whether this resolution is happening 
within an
-   *   EXECUTE IMMEDIATE context. When true, local variables are not 
accessible, only session
-   *   variables.
    * @param referredTempVariableNames When resolving within a view, this 
contains the list of
    *   variable names that the view explicitly references and should have 
access to.
    *
@@ -65,7 +62,6 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
   def resolveMultipartName(
       nameParts: Seq[String],
       resolvingView: Boolean,
-      resolvingExecuteImmediate: Boolean,
       referredTempVariableNames: Seq[Seq[String]]): Option[Expression] = {
     var resolvedVariable: Option[Expression] = None
     // We only support temp variables for now, so the variable name can at 
most have 3 parts.
@@ -76,7 +72,6 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
       resolvedVariable = resolveVariable(
         nameParts = nameParts.dropRight(numInnerFields),
         resolvingView = resolvingView,
-        resolvingExecuteImmediate = resolvingExecuteImmediate,
         referredTempVariableNames = referredTempVariableNames
       )
 
@@ -99,16 +94,12 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
 
   /**
    * Look up variable by nameParts.
-   * If in SQL Script, first check local variables, unless in EXECUTE IMMEDIATE
-   * (EXECUTE IMMEDIATE generated query cannot access local variables).
-   * if not found fall back to session variables.
+   * If in SQL Script, first check local variables.
+   * If not found fall back to session variables.
    * @param nameParts NameParts of the variable.
-   * @param resolvingExecuteImmediate Whether the current context is in 
EXECUTE IMMEDIATE.
    * @return Reference to the variable.
    */
-  def lookupVariable(
-      nameParts: Seq[String],
-      resolvingExecuteImmediate: Boolean): Option[VariableReference] = {
+  def lookupVariable(nameParts: Seq[String]): Option[VariableReference] = {
     val namePartsCaseAdjusted = if (conf.caseSensitiveAnalysis) {
       nameParts
     } else {
@@ -118,8 +109,6 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
     SqlScriptingContextManager
       .get()
       .map(_.getVariableManager)
-      // If we are in EXECUTE IMMEDIATE lookup only session variables.
-      .filterNot(_ => resolvingExecuteImmediate)
       // If variable name is qualified with session.<varName> treat it as a 
session variable.
       .filterNot(
         _ =>
@@ -156,16 +145,15 @@ class VariableResolution(tempVariableManager: 
TempVariableManager) extends SQLCo
   private def resolveVariable(
       nameParts: Seq[String],
       resolvingView: Boolean,
-      resolvingExecuteImmediate: Boolean,
       referredTempVariableNames: Seq[Seq[String]]): Option[Expression] = {
     if (resolvingView) {
       if (referredTempVariableNames.contains(nameParts)) {
-        lookupVariable(nameParts = nameParts, resolvingExecuteImmediate = 
resolvingExecuteImmediate)
+        lookupVariable(nameParts = nameParts)
       } else {
         None
       }
     } else {
-      lookupVariable(nameParts = nameParts, resolvingExecuteImmediate = 
resolvingExecuteImmediate)
+      lookupVariable(nameParts = nameParts)
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
deleted file mode 100644
index b926cdf57f16..000000000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import scala.util.{Either, Left, Right}
-
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
VariableReference}
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.{CompoundBody, LogicalPlan, 
SetVariable}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{EXECUTE_IMMEDIATE, 
TreePattern}
-import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StringType
-
-/**
- * Logical plan representing execute immediate query.
- *
- * @param args parameters of query
- * @param query query string or variable
- * @param targetVariables variables to store the result of the query
- */
-case class ExecuteImmediateQuery(
-    args: Seq[Expression],
-    query: Either[String, UnresolvedAttribute],
-    targetVariables: Seq[UnresolvedAttribute])
-  extends UnresolvedLeafNode {
-  final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE)
-}
-
-/**
- * This rule substitutes execute immediate query node with fully analyzed
- * plan that is passed as string literal or session parameter.
- */
-class SubstituteExecuteImmediate(
-    val catalogManager: CatalogManager,
-    resolveChild: LogicalPlan => LogicalPlan,
-    checkAnalysis: LogicalPlan => Unit)
-  extends Rule[LogicalPlan] {
-  private val variableResolution = new 
VariableResolution(catalogManager.tempVariableManager)
-
-  def resolveVariable(e: Expression): Expression = {
-
-    /**
-     * We know that the expression is either UnresolvedAttribute, Alias or 
Parameter, as passed from
-     * the parser. If it is an UnresolvedAttribute, we look it up in the 
catalog and return it. If
-     * it is an Alias, we resolve the child and return an Alias with the same 
name. If it is
-     * a Parameter, we leave it as is because the parameter belongs to another 
parameterized
-     * query and should be resolved later.
-     */
-    e match {
-      case u: UnresolvedAttribute =>
-        getVariableReference(u, u.nameParts)
-      case a: Alias =>
-        Alias(resolveVariable(a.child), a.name)()
-      case p: Parameter => p
-      case other =>
-        throw QueryCompilationErrors.unsupportedParameterExpression(other)
-    }
-  }
-
-  def resolveArguments(expressions: Seq[Expression]): Seq[Expression] = {
-    expressions.map { exp =>
-      if (exp.resolved) {
-        exp
-      } else {
-        resolveVariable(exp)
-      }
-    }
-  }
-
-  def extractQueryString(either: Either[String, UnresolvedAttribute]): String 
= {
-    either match {
-      case Left(v) => v
-      case Right(u) =>
-        val varReference = getVariableReference(u, u.nameParts)
-
-        if (!varReference.dataType.sameType(StringType)) {
-          throw 
QueryCompilationErrors.invalidExecuteImmediateVariableType(varReference.dataType)
-        }
-
-        // Call eval with null value passed instead of a row.
-        // This is ok as this is variable and invoking eval should
-        // be independent of row value.
-        val varReferenceValue = varReference.eval(null)
-
-        if (varReferenceValue == null) {
-          throw QueryCompilationErrors.nullSQLStringExecuteImmediate(u.name)
-        }
-
-        varReferenceValue.toString
-    }
-  }
-
-  override def apply(plan: LogicalPlan): LogicalPlan =
-    plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), 
ruleId) {
-      case e @ ExecuteImmediateQuery(expressions, _, _) if 
expressions.exists(!_.resolved) =>
-        e.copy(args = resolveArguments(expressions))
-
-      case ExecuteImmediateQuery(expressions, query, targetVariables)
-        if expressions.forall(_.resolved) =>
-
-        val queryString = extractQueryString(query)
-        val plan = parseStatement(queryString, targetVariables)
-
-        val posNodes = plan.collect { case p: LogicalPlan =>
-          p.expressions.flatMap(_.collect { case n: PosParameter => n })
-        }.flatten
-        val namedNodes = plan.collect { case p: LogicalPlan =>
-          p.expressions.flatMap(_.collect { case n: NamedParameter => n })
-        }.flatten
-
-        val queryPlan = if (expressions.isEmpty || (posNodes.isEmpty && 
namedNodes.isEmpty)) {
-          plan
-        } else if (posNodes.nonEmpty && namedNodes.nonEmpty) {
-          throw QueryCompilationErrors.invalidQueryMixedQueryParameters()
-        } else {
-          if (posNodes.nonEmpty) {
-            PosParameterizedQuery(plan, expressions)
-          } else {
-            val aliases = expressions.collect {
-              case e: Alias => e
-              case u: VariableReference => Alias(u, u.identifier.name())()
-            }
-
-            if (aliases.size != expressions.size) {
-              val nonAliases = expressions.filter(attr =>
-                !attr.isInstanceOf[Alias] && 
!attr.isInstanceOf[VariableReference])
-
-              throw 
QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases)
-            }
-
-            NameParameterizedQuery(
-              plan,
-              aliases.map(_.name),
-              // We need to resolve arguments before Resolution batch to make 
sure
-              // that some rule does not accidentally resolve our parameters.
-              // We do not want this as they can resolve some unsupported 
parameters.
-              aliases)
-          }
-        }
-
-        // Fully analyze the generated plan. 
AnalysisContext.withExecuteImmediateContext makes sure
-        // that SQL scripting local variables will not be accessed from the 
plan.
-        val finalPlan = AnalysisContext.withExecuteImmediateContext {
-          resolveChild(queryPlan)
-        }
-        checkAnalysis(finalPlan)
-
-        if (targetVariables.nonEmpty) {
-          SetVariable(targetVariables, finalPlan)
-        } else { finalPlan }
-    }
-
-  private def parseStatement(
-      queryString: String,
-      targetVariables: Seq[Expression]): LogicalPlan = {
-    // If targetVariables is defined, statement needs to be a query.
-    // Otherwise, it can be anything.
-    val plan = if (targetVariables.nonEmpty) {
-      try {
-        catalogManager.v1SessionCatalog.parser.parseQuery(queryString)
-      } catch {
-        case e: ParseException =>
-          // Since we do not have a way of telling that parseQuery failed 
because of
-          // actual parsing error or because statement was passed where query 
was expected,
-          // we need to make sure that parsePlan wouldn't throw
-          catalogManager.v1SessionCatalog.parser.parsePlan(queryString)
-
-          // Plan was successfully parsed, but query wasn't - throw.
-          throw 
QueryCompilationErrors.invalidStatementForExecuteInto(queryString)
-      }
-    } else {
-      catalogManager.v1SessionCatalog.parser.parsePlan(queryString)
-    }
-
-    if (plan.isInstanceOf[CompoundBody]) {
-      throw QueryCompilationErrors.sqlScriptInExecuteImmediate(queryString)
-    }
-
-    // do not allow nested execute immediate
-    if (plan.containsPattern(EXECUTE_IMMEDIATE)) {
-      throw QueryCompilationErrors.nestedExecuteImmediate(queryString)
-    }
-
-    plan
-  }
-
-  private def getVariableReference(expr: Expression, nameParts: Seq[String]): 
VariableReference = {
-    variableResolution.lookupVariable(
-      nameParts = nameParts,
-      resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate
-    ) match {
-      case Some(variable) => variable
-      case _ =>
-        throw QueryCompilationErrors
-          .unresolvedVariableError(
-            nameParts,
-            Seq(CatalogManager.SYSTEM_CATALOG_NAME, 
CatalogManager.SESSION_NAMESPACE),
-            expr.origin)
-    }
-  }
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
index 2cfc2a8c90dc..b3200be95f69 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
CreateArray, CreateMap,
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SupervisingCommand}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, 
PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH}
-import org.apache.spark.sql.errors.QueryErrorsBase
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.types.DataType
 
 sealed trait Parameter extends LeafExpression with Unevaluable {
@@ -104,6 +104,31 @@ case class PosParameterizedQuery(child: LogicalPlan, args: 
Seq[Expression])
     copy(child = newChild)
 }
 
+/**
+ * The logical plan representing a parameterized query with general parameter 
support.
+ * This allows the query to use either positional or named parameters based on 
the
+ * parameter markers found in the query, with optional parameter names 
provided.
+ *
+ * @param child The parameterized logical plan.
+ * @param args The literal values or collection constructor functions such as 
`map()`,
+ *             `array()`, `struct()` of parameters.
+ * @param paramNames Optional parameter names corresponding to args. If 
provided for an argument,
+ *                   that argument can be used for named parameter binding. If 
not provided
+ *                   parameters are treated as positional.
+ */
+case class GeneralParameterizedQuery(
+    child: LogicalPlan,
+    args: Seq[Expression],
+    paramNames: Seq[String])
+  extends ParameterizedQuery(child) {
+  assert(args.nonEmpty)
+  assert(paramNames.length == args.length,
+    s"paramNames must be same length as args. " +
+    s"paramNames.length=${paramNames.length}, args.length=${args.length}")
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(child = newChild)
+}
+
 /**
  * Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised 
plans to be
  * resolved later by the analyzer.
@@ -143,7 +168,7 @@ object MoveParameterizedQueriesDown extends 
Rule[LogicalPlan] {
 }
 
 /**
- * Finds all named parameters in `ParameterizedQuery` and substitutes them by 
literals or
+ * Binds all named parameters in `ParameterizedQuery` and substitutes them by 
literals or
  * by collection constructor functions such as `map()`, `array()`, `struct()`
  * from the user-specified arguments.
  */
@@ -191,6 +216,7 @@ object BindParameters extends Rule[LogicalPlan] with 
QueryErrorsBase {
       case PosParameterizedQuery(child, args)
         if !child.containsPattern(UNRESOLVED_WITH) &&
           args.forall(_.resolved) =>
+
         val indexedArgs = args.zipWithIndex
         checkArgs(indexedArgs.map(arg => (s"_${arg._2}", arg._1)))
 
@@ -203,6 +229,63 @@ object BindParameters extends Rule[LogicalPlan] with 
QueryErrorsBase {
             args(posToIndex(pos))
         }
 
+      case GeneralParameterizedQuery(child, args, paramNames)
+        if !child.containsPattern(UNRESOLVED_WITH) &&
+          args.forall(_.resolved) =>
+
+        // Check all arguments for validity (args are already evaluated 
expressions/literals)
+        val allArgs = args.zip(paramNames).zipWithIndex.map { case ((arg, 
name), index) =>
+          val finalName = if (name.isEmpty) s"_$index" else name
+          finalName -> arg
+        }
+        checkArgs(allArgs)
+
+        // Collect parameter types used in the query to enforce invariants
+        var hasNamedParam = false
+        val positionalParams = scala.collection.mutable.Set.empty[Int]
+        bind(child) {
+          case p @ NamedParameter(_) => hasNamedParam = true; p
+          case p @ PosParameter(pos) => positionalParams.add(pos); p
+        }
+
+        // Validate: no mixing of positional and named parameters
+        if (hasNamedParam && positionalParams.nonEmpty) {
+          throw QueryCompilationErrors.invalidQueryMixedQueryParameters()
+        }
+
+        // Validate: if query uses named parameters, all USING expressions 
must have names
+        if (hasNamedParam && positionalParams.isEmpty) {
+          if (paramNames.isEmpty) {
+            // Query uses named parameters but no USING expressions provided
+            throw 
QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(Seq.empty)
+          } else {
+            // Check that all USING expressions have names
+            val unnamedExpressions = paramNames.zipWithIndex.collect {
+              case ("", index) => index // empty strings are unnamed
+            }
+            if (unnamedExpressions.nonEmpty) {
+              val unnamedExprs = unnamedExpressions.map(args(_))
+              throw 
QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(unnamedExprs)
+            }
+          }
+        }
+
+        // Now we can do simple binding based on which type we determined
+        if (hasNamedParam) {
+          // Named parameter binding - paramNames guaranteed to have no nulls 
at this point
+          val namedArgsMap = paramNames.zip(args).toMap
+          bind(child) {
+            case NamedParameter(name) => namedArgsMap.getOrElse(name, 
NamedParameter(name))
+          }
+        } else {
+          // Positional parameter binding (same logic as PosParameterizedQuery)
+          val posToIndex = positionalParams.toSeq.sorted.zipWithIndex.toMap
+          bind(child) {
+            case PosParameter(pos) if posToIndex.contains(pos) && args.size > 
posToIndex(pos) =>
+              args(posToIndex(pos))
+          }
+        }
+
       case other => other
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 2e0dbe9b7f42..b759c70266f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, 
InternalRow, TableIden
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
UnaryNode}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
SupportsSubquery, UnaryNode}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
@@ -1212,3 +1212,19 @@ trait UnresolvedPlanId extends LeafExpression with 
Unevaluable {
   // Subclasses can override this function to provide more TreePatterns.
   def nodePatternsInternal(): Seq[TreePattern] = Seq()
 }
+
+/**
+ * Logical plan representing execute immediate query.
+ *
+ * @param sqlStmtStr the query expression (first child)
+ * @param args parameters from USING clause (subsequent children)
+ * @param targetVariables variables to store the result of the query
+ */
+case class UnresolvedExecuteImmediate(
+    sqlStmtStr: Expression,
+    args: Seq[Expression],
+    targetVariables: Seq[Expression])
+  extends UnresolvedLeafNode with SupportsSubquery {
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE)
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 4e930280381c..b42cba86e0fb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Set}
 import scala.jdk.CollectionConverters._
-import scala.util.{Left, Right}
 
 import org.antlr.v4.runtime.{ParserRuleContext, RuleContext, Token}
 import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
@@ -1149,7 +1148,7 @@ class AstBuilder extends DataTypeAstBuilder
   }
 
   /**
-   * Returns the parameters for [[ExecuteImmediateQuery]] logical plan.
+   * Returns the parameters for [[UnresolvedExecuteImmediate]] logical plan.
    * Expected format:
    * {{{
    *   EXECUTE IMMEDIATE {query_string|string_literal}
@@ -1157,11 +1156,8 @@ class AstBuilder extends DataTypeAstBuilder
    * }}}
    */
   override def visitExecuteImmediate(ctx: ExecuteImmediateContext): 
LogicalPlan = withOrigin(ctx) {
-    // Because of how parsing rules are written, we know that either
-    // queryParam or targetVariable is non null - hence use Either to 
represent this.
-    val queryString = Option(ctx.queryParam.stringLit()).map(sl => 
Left(string(visitStringLit(sl))))
-    val queryVariable = Option(ctx.queryParam.multipartIdentifier)
-      .map(mpi => Right(UnresolvedAttribute(visitMultipartIdentifier(mpi))))
+    // With the new grammar, queryParam is now an expression
+    val queryParam = expression(ctx.queryParam)
 
     val targetVars = Option(ctx.targetVariable).toSeq
       .flatMap(v => visitMultipartIdentifierList(v))
@@ -1169,8 +1165,7 @@ class AstBuilder extends DataTypeAstBuilder
       visitExecuteImmediateUsing(_)
     }.getOrElse{ Seq.empty }
 
-
-    ExecuteImmediateQuery(exprs, queryString.getOrElse(queryVariable.get), 
targetVars)
+    UnresolvedExecuteImmediate(queryParam, exprs, targetVars)
   }
 
   override def visitExecuteImmediateUsing(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index e7b59af5e776..c68b8a2c29af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -97,7 +97,7 @@ object RuleIdCollection {
       "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" ::
       
"org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveSetVariable" ::
-      "org.apache.spark.sql.catalyst.analysis.SubstituteExecuteImmediate" ::
+      "org.apache.spark.sql.catalyst.analysis.ResolveExecuteImmediate" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" ::
       "org.apache.spark.sql.catalyst.analysis.ResolveUnion" ::
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 74eed741622f..d1c81990f4f4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -4131,10 +4131,10 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map.empty)
   }
 
-  def invalidExecuteImmediateVariableType(dataType: DataType): Throwable = {
+  def invalidExecuteImmediateExpressionType(dataType: DataType): Throwable = {
     throw new AnalysisException(
-      errorClass = "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
-      messageParameters = Map("varType" -> toSQLType(dataType)))
+      errorClass = "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+      messageParameters = Map("exprType" -> toSQLType(dataType)))
   }
 
   def nullSQLStringExecuteImmediate(varName: String): Throwable = {
@@ -4149,12 +4149,6 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("sqlString" -> toSQLStmt(queryString)))
   }
 
-  def nestedExecuteImmediate(queryString: String): Throwable = {
-    throw new AnalysisException(
-      errorClass = "NESTED_EXECUTE_IMMEDIATE",
-      messageParameters = Map("sqlString" -> toSQLStmt(queryString)))
-  }
-
   def sqlScriptInExecuteImmediate(sqlScriptString: String): Throwable = {
     throw new AnalysisException(
       errorClass = "SQL_SCRIPT_IN_EXECUTE_IMMEDIATE",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a301f77cf0c0..94f650bc35c7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -834,88 +834,10 @@ class AnalysisErrorSuite extends AnalysisTest with 
DataTypeErrorsBase {
       """"explode(array(min(a)))", "explode(array(max(a)))"""" :: Nil
   )
 
-  errorConditionTest(
-    "EXEC IMMEDIATE - nested execute immediate not allowed",
-    CatalystSqlParser.parsePlan("EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE 
\\\'SELECT 42\\\''"),
-    "NESTED_EXECUTE_IMMEDIATE",
-    Map(
-      "sqlString" -> "EXECUTE IMMEDIATE 'SELECT 42'"))
-
-  errorConditionTest(
-    "EXEC IMMEDIATE - both positional and named used",
-    CatalystSqlParser.parsePlan("EXECUTE IMMEDIATE 'SELECT 42 where ? = 
:first'" +
-      " USING 1, 2 as first"),
-    "INVALID_QUERY_MIXED_QUERY_PARAMETERS",
-    Map.empty)
-
-  test("EXEC IMMEDIATE - non string variable as sqlString parameter") {
-    val execImmediatePlan = ExecuteImmediateQuery(
-      Seq.empty,
-      scala.util.Right(UnresolvedAttribute("testVarA")),
-      Seq(UnresolvedAttribute("testVarA")))
-
-    assertAnalysisErrorCondition(
-      inputPlan = execImmediatePlan,
-      expectedErrorCondition = 
"INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
-      expectedMessageParameters = Map(
-        "varType" -> "\"INT\""
-      ))
-  }
-
-  test("EXEC IMMEDIATE - Null string as sqlString parameter") {
-    val execImmediatePlan = ExecuteImmediateQuery(
-      Seq.empty,
-      scala.util.Right(UnresolvedAttribute("testVarNull")),
-      Seq(UnresolvedAttribute("testVarNull")))
-
-    assertAnalysisErrorCondition(
-      inputPlan = execImmediatePlan,
-      expectedErrorCondition = "NULL_QUERY_STRING_EXECUTE_IMMEDIATE",
-      expectedMessageParameters = Map("varName" -> "`testVarNull`"))
-  }
 
 
-  test("EXEC IMMEDIATE - Unsupported expr for parameter") {
-    val execImmediatePlan: LogicalPlan = ExecuteImmediateQuery(
-      Seq(UnresolvedAttribute("testVarA"), NaNvl(Literal(1), Literal(1))),
-      scala.util.Left("SELECT ?"),
-      Seq.empty)
 
-    assertAnalysisErrorCondition(
-      inputPlan = execImmediatePlan,
-      expectedErrorCondition = "UNSUPPORTED_EXPR_FOR_PARAMETER",
-      expectedMessageParameters = Map(
-        "invalidExprSql" -> "\"nanvl(1, 1)\""
-      ))
-  }
-
-  test("EXEC IMMEDIATE - Name Parametrize query with non named parameters") {
-    val execImmediateSetVariablePlan = ExecuteImmediateQuery(
-      Seq(Literal(2), new Alias(UnresolvedAttribute("testVarA"), "first")(), 
Literal(3)),
-      scala.util.Left("SELECT :first"),
-      Seq.empty)
-
-    assertAnalysisErrorCondition(
-      inputPlan = execImmediateSetVariablePlan,
-      expectedErrorCondition = "ALL_PARAMETERS_MUST_BE_NAMED",
-      expectedMessageParameters = Map(
-        "exprs" -> "\"2\", \"3\""
-      ))
-  }
-
-  test("EXEC IMMEDIATE - INTO specified for COMMAND query") {
-    val execImmediateSetVariablePlan = ExecuteImmediateQuery(
-      Seq.empty,
-      scala.util.Left("SET VAR testVarA = 1"),
-      Seq(UnresolvedAttribute("testVarA")))
 
-    assertAnalysisErrorCondition(
-      inputPlan = execImmediateSetVariablePlan,
-      expectedErrorCondition = "INVALID_STATEMENT_FOR_EXECUTE_INTO",
-      expectedMessageParameters = Map(
-        "sqlString" -> "SET VAR TESTVARA = 1"
-      ))
-  }
 
   test("SPARK-6452 regression test") {
     // CheckAnalysis should throw AnalysisException when Aggregate contains 
missing attribute(s)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 0c8d2bae418a..5992f2d099bd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{AliasIdentifier, QueryPlanningTracker, 
TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog, 
VariableDefinition}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning, RoundRobinPartitioning}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable}
+import org.apache.spark.sql.connector.catalog.InMemoryTable
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -1520,34 +1520,6 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     assertAnalysisSuccess(finalPlan)
   }
 
-  test("Execute Immediate plan transformation") {
-    try {
-    val varDef1 = VariableDefinition(Identifier.of(Array("res"), "res"), "1", 
Literal(1))
-    SimpleAnalyzer.catalogManager.tempVariableManager.create(
-      Seq("res", "res"), varDef1, overrideIfExists = true)
-
-    val varDef2 = VariableDefinition(Identifier.of(Array("res2"), "res2"), 
"1", Literal(1))
-    SimpleAnalyzer.catalogManager.tempVariableManager.create(
-      Seq("res2", "res2"), varDef2, overrideIfExists = true)
-    val actual1 = parsePlan("EXECUTE IMMEDIATE 'SELECT 42 WHERE ? = 1' USING 
2").analyze
-    val expected1 = parsePlan("SELECT 42 where 2 = 1").analyze
-    comparePlans(actual1, expected1)
-    val actual2 = parsePlan(
-      "EXECUTE IMMEDIATE 'SELECT 42 WHERE :first = 1' USING 2 as 
first").analyze
-    val expected2 = parsePlan("SELECT 42 where 2 = 1").analyze
-    comparePlans(actual2, expected2)
-    // Test that plan is transformed to SET operation
-    val actual3 = parsePlan(
-      "EXECUTE IMMEDIATE 'SELECT 17, 7 WHERE ? = 1' INTO res, res2 USING 
2").analyze
-      // Normalize to make the plan equivalent to the below set statement.
-    val expected3 = parsePlan("SET var (res, res2) = (SELECT 17, 7 where 2 = 
1)").analyze
-      comparePlans(actual3, expected3)
-    } finally {
-      SimpleAnalyzer.catalogManager.tempVariableManager.remove(Seq("res"))
-      SimpleAnalyzer.catalogManager.tempVariableManager.remove(Seq("res2"))
-    }
-  }
-
   test("SPARK-41271: bind named parameters to literals") {
     CTERelationDef.curId.set(0)
     val actual1 = NameParameterizedQuery(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
new file mode 100644
index 000000000000..43d08c47b14a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.SqlScriptingContextManager
+import org.apache.spark.sql.catalyst.expressions.{Alias, EmptyRow, Expression, 
Literal,
+  VariableReference}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, CompoundBody, 
LogicalPlan, SetVariable}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.EXECUTE_IMMEDIATE
+import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types.StringType
+
+/**
+ * Analysis rule that resolves and executes EXECUTE IMMEDIATE statements 
during analysis,
+ * replacing them with the results, similar to how CALL statements work.
+ * This rule combines resolution and execution in a single pass.
+ */
+case class ResolveExecuteImmediate(sparkSession: SparkSession, catalogManager: 
CatalogManager)
+  extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), 
ruleId) {
+      case node @ UnresolvedExecuteImmediate(sqlStmtStr, args, 
targetVariables) =>
+        if (sqlStmtStr.resolved && targetVariables.forall(_.resolved) && 
args.forall(_.resolved)) {
+          // All resolved - execute immediately and handle INTO clause if 
present
+          if (targetVariables.nonEmpty) {
+            // EXECUTE IMMEDIATE ... INTO should generate SetVariable plan 
with eagerly executed
+            // source
+            val finalTargetVars = extractTargetVariables(targetVariables)
+            val executedSource = executeImmediateQuery(sqlStmtStr, args, 
hasIntoClause = true)
+            SetVariable(finalTargetVars, executedSource)
+          } else {
+            // Regular EXECUTE IMMEDIATE without INTO - execute and return 
result directly
+            executeImmediateQuery(sqlStmtStr, args, hasIntoClause = false)
+          }
+        } else {
+          // Not all resolved yet - wait for next iteration
+          node
+        }
+    }
+  }
+
+  private def extractTargetVariables(targetVariables: Seq[Expression]): 
Seq[VariableReference] = {
+    targetVariables.map {
+      case alias: Alias =>
+        // Extract the VariableReference from the alias
+        alias.child match {
+          case varRef: VariableReference =>
+            // Use resolved VariableReference directly with canFold = false
+            varRef.copy(canFold = false)
+          case _ =>
+            throw 
QueryCompilationErrors.unsupportedParameterExpression(alias.child)
+        }
+      case varRef: VariableReference =>
+        // Use resolved VariableReference directly with canFold = false
+        varRef.copy(canFold = false)
+      case other =>
+        throw QueryCompilationErrors.unsupportedParameterExpression(other)
+    }
+  }
+
+  private def executeImmediateQuery(
+      sqlStmtStr: Expression,
+      args: Seq[Expression],
+      hasIntoClause: Boolean): LogicalPlan = {
+    // Extract the query string from the queryParam expression
+    val sqlString = extractQueryString(sqlStmtStr)
+
+    // Parse and validate the query
+    val parsedPlan = sparkSession.sessionState.sqlParser.parsePlan(sqlString)
+    validateQuery(sqlString, parsedPlan)
+
+    // Execute the query recursively with isolated local variable context
+    val result = if (args.isEmpty) {
+      // No parameters - execute directly
+      withIsolatedLocalVariableContext {
+        sparkSession.sql(sqlString)
+      }
+    } else {
+      // For parameterized queries, build parameter arrays
+      val (paramValues, paramNames) = buildUnifiedParameters(args)
+
+      withIsolatedLocalVariableContext {
+        sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession]
+          .sql(sqlString, paramValues, paramNames)
+      }
+    }
+
+    // If this EXECUTE IMMEDIATE has an INTO clause, commands are not allowed
+    if (hasIntoClause && result.queryExecution.analyzed.isInstanceOf[Command]) 
{
+      throw QueryCompilationErrors.invalidStatementForExecuteInto(sqlString)
+    }
+
+    // For commands, use commandExecuted to avoid double execution
+    // For queries, use analyzed to avoid eager evaluation
+    if (result.queryExecution.analyzed.isInstanceOf[Command]) {
+      result.queryExecution.commandExecuted
+    } else {
+      result.queryExecution.analyzed
+    }
+  }
+
+  private def extractQueryString(queryExpr: Expression): String = {
+    // Ensure the expression resolves to string type
+    if (!queryExpr.dataType.sameType(StringType)) {
+      throw 
QueryCompilationErrors.invalidExecuteImmediateExpressionType(queryExpr.dataType)
+    }
+
+    // Evaluate the expression to get the query string
+    val value = queryExpr.eval(null)
+    if (value == null) {
+      // Extract the original text from the expression's origin for the error 
message
+      val originalText = extractOriginalText(queryExpr)
+      throw QueryCompilationErrors.nullSQLStringExecuteImmediate(originalText)
+    }
+
+    value.toString
+  }
+
+  private def extractOriginalText(queryExpr: Expression): String = {
+    val origin = queryExpr.origin
+    // Try to extract the original text from the origin information
+    (origin.sqlText, origin.startIndex, origin.stopIndex) match {
+      case (Some(sqlText), Some(startIndex), Some(stopIndex)) =>
+        // Extract the substring from the original SQL text
+        sqlText.substring(startIndex, stopIndex + 1)
+      case _ =>
+        // Fallback to the SQL representation if origin information is not 
available
+        queryExpr.sql
+    }
+  }
+
+  private def validateQuery(queryString: String, parsedPlan: LogicalPlan): 
Unit = {
+    // Check for compound bodies (SQL scripting)
+    if (parsedPlan.isInstanceOf[CompoundBody]) {
+      throw QueryCompilationErrors.sqlScriptInExecuteImmediate(queryString)
+    }
+  }
+
+  /**
+   * Builds parameter arrays for the sql() API.
+   */
+  private def buildUnifiedParameters(args: Seq[Expression]): (Array[Any], 
Array[String]) = {
+    val values = scala.collection.mutable.ListBuffer[Any]()
+    val names = scala.collection.mutable.ListBuffer[String]()
+
+    args.foreach {
+      case alias: Alias =>
+        val paramValue = evaluateParameterExpression(alias.child)
+        values += paramValue
+        names += alias.name
+      case expr =>
+        // Positional parameter: just a value
+        val paramValue = evaluateParameterExpression(expr)
+        values += paramValue
+        names += "" // unnamed expression
+    }
+
+    (values.toArray, names.toArray)
+  }
+
+  /**
+   * Evaluates a parameter expression. Validation for unsupported constructs 
like subqueries
+   * is already done during analysis in 
ResolveExecuteImmediate.validateExpressions().
+   */
+  private def evaluateParameterExpression(expr: Expression): Any = {
+    expr match {
+      case varRef: VariableReference =>
+        // Variable references should be evaluated to their values
+        varRef.eval(EmptyRow)
+      case foldable if foldable.foldable =>
+        Literal.create(foldable.eval(EmptyRow), foldable.dataType).value
+      case other =>
+        // Expression is not foldable - not supported for parameters
+        throw QueryCompilationErrors.unsupportedParameterExpression(other)
+    }
+  }
+
+  /**
+   * Temporarily isolates the SQL scripting context during EXECUTE IMMEDIATE 
execution.
+   * This makes withinSqlScript() return false, ensuring that statements 
within EXECUTE IMMEDIATE
+   * are not affected by the outer SQL script context (e.g., local variables, 
script-specific
+   * errors).
+   */
+  private def withIsolatedLocalVariableContext[A](f: => A): A = {
+    // Completely clear the SQL scripting context to make withinSqlScript() 
return false
+    val handle = SqlScriptingContextManager.create(null)
+    handle.runWith(f)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
index 681e1b16af59..c7a008ab1128 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql
 import org.apache.spark.sql.{Artifact, DataSourceRegistration, Encoder, 
Encoders, ExperimentalMethods, Row, SparkSessionBuilder, SparkSessionCompanion, 
SparkSessionExtensions, SparkSessionExtensionsProvider, UDTFRegistration}
 import org.apache.spark.sql.artifact.ArtifactManager
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.analysis.{NameParameterizedQuery, 
PosParameterizedQuery, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{GeneralParameterizedQuery, 
NameParameterizedQuery, PosParameterizedQuery, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -450,8 +450,8 @@ class SparkSession private(
       val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
         val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
         if (args.nonEmpty) {
+          // Check for SQL scripting with positional parameters before 
creating parameterized query
           if (parsedPlan.isInstanceOf[CompoundBody]) {
-            // Positional parameters are not supported for SQL scripting.
             throw 
SqlScriptingErrors.positionalParametersAreNotSupportedWithSqlScripting()
           }
           PosParameterizedQuery(parsedPlan, 
args.map(lit(_).expr).toImmutableArraySeq)
@@ -509,6 +509,45 @@ class SparkSession private(
     sql(sqlText, args.asScala.toMap)
   }
 
+  /**
+   * Executes a SQL query substituting parameters by the given arguments with 
optional names,
+   * returning the result as a `DataFrame`. This method allows the inner query 
to determine
+   * whether to use positional or named parameters based on its parameter 
markers.
+   */
+  private[sql] def sql(sqlText: String, args: Array[_], paramNames: 
Array[String]): DataFrame = {
+    sql(sqlText, args, paramNames, new QueryPlanningTracker)
+  }
+
+  /**
+   * Internal implementation of unified parameter API with tracker.
+   */
+  private[sql] def sql(
+      sqlText: String,
+      args: Array[_],
+      paramNames: Array[String],
+      tracker: QueryPlanningTracker): DataFrame =
+    withActive {
+      val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
+        val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
+        if (args.nonEmpty) {
+          // Check for SQL scripting with positional parameters before 
creating parameterized query
+          if (parsedPlan.isInstanceOf[CompoundBody] && paramNames.isEmpty) {
+            throw 
SqlScriptingErrors.positionalParametersAreNotSupportedWithSqlScripting()
+          }
+          // Create a general parameter query that can handle both positional 
and named parameters
+          // The query itself will determine which type to use based on its 
parameter markers
+          GeneralParameterizedQuery(
+            parsedPlan,
+            args.map(lit(_).expr).toImmutableArraySeq,
+            paramNames.toImmutableArraySeq
+          )
+        } else {
+          parsedPlan
+        }
+      }
+      Dataset.ofRows(self, plan, tracker)
+    }
+
   /** @inheritdoc */
   override def sql(sqlText: String): DataFrame = sql(sqlText, 
Map.empty[String, Any])
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index a3591ff89e5c..e31e7e8d704c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, KEY, VALUE}
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, 
VariableResolution}
+import org.apache.spark.sql.catalyst.analysis.VariableResolution
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData
@@ -112,8 +112,7 @@ case class SetCommand(kv: Option[(String, Option[String])])
             
sparkSession.sessionState.analyzer.catalogManager.tempVariableManager
           )
           val variable = variableResolution.lookupVariable(
-            nameParts = varName,
-            resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate
+            nameParts = varName
           )
           if (variable.isDefined) {
             throw new AnalysisException(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index ada30cde27cd..5abb1e75543a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.internal
 import org.apache.spark.annotation.Unstable
 import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, 
SparkSessionExtensions, UDTFRegistration}
 import org.apache.spark.sql.artifact.ArtifactManager
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, 
ReplaceCharWithVarchar, ResolveDataSource, ResolveSessionCatalog, 
ResolveTranspose, TableFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, 
ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, 
ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry}
 import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension
 import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExtractSemiStructuredFields}
@@ -244,6 +244,7 @@ abstract class BaseSessionStateBuilder(
         new EvalSubqueriesForTimeTravel +:
         new ResolveTranspose(session) +:
         new InvokeProcedures(session) +:
+        ResolveExecuteImmediate(session, this.catalogManager) +:
         ExtractSemiStructuredFields +:
         customResolutionRules
 
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out
index d575cac56d28..b576f0454d45 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out
@@ -48,20 +48,22 @@ SetVariable 
[variablereference(system.session.sql_string=CAST(NULL AS STRING))]
 -- !query
 EXECUTE IMMEDIATE 'SET spark.sql.ansi.enabled=true'
 -- !query analysis
-SetCommand (spark.sql.ansi.enabled,Some(true))
+CommandResult [key#x, value#x], Execute SetCommand, 
[[spark.sql.ansi.enabled,true]]
+   +- SetCommand (spark.sql.ansi.enabled,Some(true))
 
 
 -- !query
 EXECUTE IMMEDIATE 'CREATE TEMPORARY VIEW IDENTIFIER(:tblName) AS SELECT id, 
name FROM tbl_view' USING 'tbl_view_tmp' as tblName
 -- !query analysis
-CreateViewCommand `tbl_view_tmp`, SELECT id, name FROM tbl_view, false, false, 
LocalTempView, UNSUPPORTED, true
-   +- Project [id#x, name#x]
-      +- SubqueryAlias tbl_view
-         +- View (`tbl_view`, [id#x, name#x, data#x])
-            +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
-               +- Project [id#x, name#x, data#x]
-                  +- SubqueryAlias tbl_view
-                     +- LocalRelation [id#x, name#x, data#x]
+CommandResult Execute CreateViewCommand
+   +- CreateViewCommand `tbl_view_tmp`, SELECT id, name FROM tbl_view, false, 
false, LocalTempView, UNSUPPORTED, true
+         +- Project [id#x, name#x]
+            +- SubqueryAlias tbl_view
+               +- View (`tbl_view`, [id#x, name#x, data#x])
+                  +- Project [cast(id#x as int) AS id#x, cast(name#x as 
string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) 
AS data#x]
+                     +- Project [id#x, name#x, data#x]
+                        +- SubqueryAlias tbl_view
+                           +- LocalRelation [id#x, name#x, data#x]
 
 
 -- !query
@@ -83,7 +85,8 @@ Project [id#x, name#x]
 -- !query
 EXECUTE IMMEDIATE 'REFRESH TABLE IDENTIFIER(:tblName)' USING 'x' as tblName
 -- !query analysis
-RefreshTableCommand `spark_catalog`.`default`.`x`
+CommandResult Execute RefreshTableCommand
+   +- RefreshTableCommand `spark_catalog`.`default`.`x`
 
 
 -- !query
@@ -152,7 +155,7 @@ Project [id#x, name#x, data#x]
 EXECUTE IMMEDIATE sql_string USING a, 'name2'
 -- !query analysis
 Project [id#x, name#x, data#x]
-+- Filter ((name#x = variablereference(system.session.a='name1')) OR (name#x = 
name2))
++- Filter ((name#x = name1) OR (name#x = name2))
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -178,7 +181,7 @@ Project [id#x, name#x, data#x]
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = ? or name = ?' USING a, 
'name2'
 -- !query analysis
 Project [id#x, name#x, data#x]
-+- Filter ((name#x = variablereference(system.session.a='name1')) OR (name#x = 
name2))
++- Filter ((name#x = name1) OR (name#x = name2))
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -191,7 +194,7 @@ Project [id#x, name#x, data#x]
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = ? or name = ?' USING 
(a, 'name2')
 -- !query analysis
 Project [id#x, name#x, data#x]
-+- Filter ((name#x = variablereference(system.session.a='name1')) OR (name#x = 
name2))
++- Filter ((name#x = name1) OR (name#x = name2))
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -203,9 +206,10 @@ Project [id#x, name#x, data#x]
 -- !query
 EXECUTE IMMEDIATE 'INSERT INTO x VALUES(?)' USING 1
 -- !query analysis
-InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/x, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/x], Append, `spark_catalog`.`default`.`x`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/x), [id]
-+- Project [col1#x AS id#x]
-   +- LocalRelation [col1#x]
+CommandResult Execute InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/x, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/x], Append, `spark_catalog`.`default`.`x`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/x), [id]
+   +- InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/x, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/x], Append, `spark_catalog`.`default`.`x`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/x), [id]
+      +- Project [col1#x AS id#x]
+         +- LocalRelation [col1#x]
 
 
 -- !query
@@ -256,7 +260,7 @@ Project [id#x, name#x, data#x]
 EXECUTE IMMEDIATE sql_string USING b as second, 'name7' as first
 -- !query analysis
 Project [id#x, name#x, data#x]
-+- Filter ((name#x = name7) OR (id#x = variablereference(system.session.b=40)))
++- Filter ((name#x = name7) OR (id#x = 40))
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -282,7 +286,7 @@ Project [id#x, name#x, data#x]
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first or id = :second' 
USING 'name7' as first, b as second
 -- !query analysis
 Project [id#x, name#x, data#x]
-+- Filter ((name#x = name7) OR (id#x = variablereference(system.session.b=40)))
++- Filter ((name#x = name7) OR (id#x = 40))
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -307,9 +311,10 @@ Project [id#x, name#x, data#x, name7 AS p#x]
 -- !query
 EXECUTE IMMEDIATE 'SET VAR sql_string = ?' USING 'SELECT id from tbl_view 
where name = :first'
 -- !query analysis
-SetVariable [variablereference(system.session.sql_string='SELECT * from 
tbl_view where name = :first or id = :second')]
-+- Project [SELECT id from tbl_view where name = :first AS sql_string#x]
-   +- OneRowRelation
+CommandResult SetVariable [variablereference(system.session.sql_string='SELECT 
* from tbl_view where name = :first or id = :second')]
+   +- SetVariable [variablereference(system.session.sql_string='SELECT * from 
tbl_view where name = :first or id = :second')]
+      +- Project [SELECT id from tbl_view where name = :first AS sql_string#x]
+         +- OneRowRelation
 
 
 -- !query
@@ -356,7 +361,7 @@ SetVariable [variablereference(system.session.res_id=70)]
 +- GlobalLimit 2
    +- LocalLimit 2
       +- Project [id#x]
-         +- Filter (name#x = variablereference(system.session.a='name1'))
+         +- Filter (name#x = name1)
             +- SubqueryAlias tbl_view
                +- View (`tbl_view`, [id#x, name#x, data#x])
                   +- Project [cast(id#x as int) AS id#x, cast(name#x as 
string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) 
AS data#x]
@@ -422,7 +427,7 @@ Project [variablereference(system.session.b=10) AS b#x, 
variablereference(system
 EXECUTE IMMEDIATE 'SELECT * FROM tbl_view where id = ? AND name = ?' USING b 
as first, a
 -- !query analysis
 Project [id#x, name#x, data#x]
-+- Filter ((id#x = variablereference(system.session.b=10)) AND (name#x = 
variablereference(system.session.a='name1')))
++- Filter ((id#x = 10) AND (name#x = name1))
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -500,15 +505,34 @@ org.apache.spark.sql.AnalysisException
 
 
 -- !query
-EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id
+DECLARE OR REPLACE testvarA INT
+-- !query analysis
+CreateVariable defaultvalueexpression(null, null), true
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, 
session.testvarA
+
+
+-- !query
+EXECUTE IMMEDIATE 'SET VAR testVarA = 1' INTO testVarA
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "UNRESOLVED_VARIABLE",
-  "sqlState" : "42883",
+  "errorClass" : "INVALID_STATEMENT_FOR_EXECUTE_INTO",
+  "sqlState" : "07501",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
-    "variableName" : "`id`"
+    "sqlString" : "SET VAR TESTVARA = 1"
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`id`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -590,10 +614,10 @@ EXECUTE IMMEDIATE b
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
   "sqlState" : "42K09",
   "messageParameters" : {
-    "varType" : "\"INT\""
+    "exprType" : "\"INT\""
   }
 }
 
@@ -617,21 +641,14 @@ SetVariable [variablereference(system.session.a='name1')]
 -- !query
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING CONCAT(a 
, "me1") as first
 -- !query analysis
-org.apache.spark.sql.AnalysisException
-{
-  "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER",
-  "sqlState" : "42K0E",
-  "messageParameters" : {
-    "invalidExprSql" : "\"CONCAT(a, me1)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 70,
-    "stopIndex" : 86,
-    "fragment" : "CONCAT(a , \"me1\")"
-  } ]
-}
+Project [id#x, name#x, data#x]
++- Filter (name#x = name1)
+   +- SubqueryAlias tbl_view
+      +- View (`tbl_view`, [id#x, name#x, data#x])
+         +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
+            +- Project [id#x, name#x, data#x]
+               +- SubqueryAlias tbl_view
+                  +- LocalRelation [id#x, name#x, data#x]
 
 
 -- !query
@@ -760,7 +777,7 @@ CreateVariable defaultvalueexpression(10, 10), false
 EXECUTE IMMEDIATE 'SELECT id FROM tbl_view WHERE id = :p' USING p
 -- !query analysis
 Project [id#x]
-+- Filter (id#x = variablereference(system.session.p=10))
++- Filter (id#x = 10)
    +- SubqueryAlias tbl_view
       +- View (`tbl_view`, [id#x, name#x, data#x])
          +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
@@ -796,36 +813,200 @@ org.apache.spark.sql.AnalysisException
 
 
 -- !query
-EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ? 
USING 10\''
+EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ?\' 
USING 10'
+-- !query analysis
+Project [id#x]
++- Filter (id#x = 10)
+   +- SubqueryAlias tbl_view
+      +- View (`tbl_view`, [id#x, name#x, data#x])
+         +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS 
name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x]
+            +- Project [id#x, name#x, data#x]
+               +- SubqueryAlias tbl_view
+                  +- LocalRelation [id#x, name#x, data#x]
+
+
+-- !query
+SET VAR sql_string = null
+-- !query analysis
+SetVariable [variablereference(system.session.sql_string='SELECT * from 
tbl_view where name = :first or id = :second')]
++- Project [cast(sql_string#x as string) AS sql_string#x]
+   +- Project [null AS sql_string#x]
+      +- OneRowRelation
+
+
+-- !query
+EXECUTE IMMEDIATE sql_string
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "NESTED_EXECUTE_IMMEDIATE",
-  "sqlState" : "07501",
+  "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE",
+  "sqlState" : "22004",
   "messageParameters" : {
-    "sqlString" : "EXECUTE IMMEDIATE 'SELECT ID FROM TBL_VIEW WHERE ID = ? 
USING 10'"
+    "varName" : "`sql_string`"
   }
 }
 
 
 -- !query
-SET VAR sql_string = null
+SET VAR sql_string = 5
 -- !query analysis
-SetVariable [variablereference(system.session.sql_string='SELECT * from 
tbl_view where name = :first or id = :second')]
+SetVariable [variablereference(system.session.sql_string=CAST(NULL AS STRING))]
 +- Project [cast(sql_string#x as string) AS sql_string#x]
-   +- Project [null AS sql_string#x]
+   +- Project [5 AS sql_string#x]
       +- OneRowRelation
 
 
 -- !query
 EXECUTE IMMEDIATE sql_string
 -- !query analysis
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'5'",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 28,
+    "fragment" : "EXECUTE IMMEDIATE sql_string"
+  } ]
+}
+
+
+-- !query
+SET VAR sql_string = 'hello'
+-- !query analysis
+SetVariable [variablereference(system.session.sql_string='5')]
++- Project [hello AS sql_string#x]
+   +- OneRowRelation
+
+
+-- !query
+EXECUTE IMMEDIATE length(sql_string)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprType" : "\"INT\""
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT 42 where ? = :first' USING 1, 2 as first
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_QUERY_MIXED_QUERY_PARAMETERS",
+  "sqlState" : "42613"
+}
+
+
+-- !query
+DECLARE int_var INT
+-- !query analysis
+CreateVariable defaultvalueexpression(null, null), false
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, 
session.int_var
+
+
+-- !query
+SET VAR int_var = 42
+-- !query analysis
+SetVariable [variablereference(system.session.int_var=CAST(NULL AS INT))]
++- Project [42 AS int_var#x]
+   +- OneRowRelation
+
+
+-- !query
+EXECUTE IMMEDIATE int_var
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprType" : "\"INT\""
+  }
+}
+
+
+-- !query
+DECLARE null_var STRING
+-- !query analysis
+CreateVariable defaultvalueexpression(null, null), false
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, 
session.null_var
+
+
+-- !query
+SET VAR null_var = null
+-- !query analysis
+SetVariable [variablereference(system.session.null_var=CAST(NULL AS STRING))]
++- Project [cast(null_var#x as string) AS null_var#x]
+   +- Project [null AS null_var#x]
+      +- OneRowRelation
+
+
+-- !query
+EXECUTE IMMEDIATE null_var
+-- !query analysis
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE",
   "sqlState" : "22004",
   "messageParameters" : {
-    "varName" : "`sql_string`"
+    "varName" : "`null_var`"
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT ?' USING (SELECT 1)
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER",
+  "sqlState" : "42K0E",
+  "messageParameters" : {
+    "invalidExprSql" : "\"scalarsubquery()\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 45,
+    "fragment" : "(SELECT 1)"
+  } ]
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT :first' USING 2, 3
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "ALL_PARAMETERS_MUST_BE_NAMED",
+  "sqlState" : "07001",
+  "messageParameters" : {
+    "exprs" : "\"2\", \"3\""
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE (SELECT c FROM (VALUES(1)) AS T(c))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprType" : "\"INT\""
   }
 }
 
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out
index 1c9db4eaed6f..e73a4c547103 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out
@@ -994,24 +994,36 @@ CreateVariable defaultvalueexpression(null, null), false
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 UNION SELECT 2 UNION SELECT 3' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))]
-+- Project [1 AS UNION#x]
-   +- OneRowRelation
+org.apache.spark.SparkException
+{
+  "errorClass" : "ROW_SUBQUERY_TOO_MANY_ROWS",
+  "sqlState" : "21000"
+}
 
 
 -- !query
 SELECT v1
 -- !query analysis
-Project [variablereference(system.session.v1=1) AS v1#x]
+Project [variablereference(system.session.v1=CAST(NULL AS INT)) AS v1#x]
 +- OneRowRelation
 
 
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 UNION SELECT 1 UNION SELECT 1' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS UNION#x]
-   +- OneRowRelation
+SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))]
++- GlobalLimit 2
+   +- LocalLimit 2
+      +- Distinct
+         +- Union false, false
+            :- Distinct
+            :  +- Union false, false
+            :     :- Project [1 AS 1#x]
+            :     :  +- OneRowRelation
+            :     +- Project [1 AS 1#x]
+            :        +- OneRowRelation
+            +- Project [1 AS 1#x]
+               +- OneRowRelation
 
 
 -- !query
@@ -1025,8 +1037,14 @@ Project [variablereference(system.session.v1=1) AS v1#x]
 EXECUTE IMMEDIATE 'SELECT 1 EXCEPT SELECT 2 EXCEPT SELECT 3' INTO v1
 -- !query analysis
 SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS EXCEPT#x]
-   +- OneRowRelation
++- Except false
+   :- Except false
+   :  :- Project [1 AS 1#x]
+   :  :  +- OneRowRelation
+   :  +- Project [2 AS 2#x]
+   :     +- OneRowRelation
+   +- Project [3 AS 3#x]
+      +- OneRowRelation
 
 
 -- !query
@@ -1040,38 +1058,56 @@ Project [variablereference(system.session.v1=1) AS v1#x]
 EXECUTE IMMEDIATE 'SELECT 1 EXCEPT SELECT 1 EXCEPT SELECT 1' INTO v1
 -- !query analysis
 SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS EXCEPT#x]
-   +- OneRowRelation
++- Except false
+   :- Except false
+   :  :- Project [1 AS 1#x]
+   :  :  +- OneRowRelation
+   :  +- Project [1 AS 1#x]
+   :     +- OneRowRelation
+   +- Project [1 AS 1#x]
+      +- OneRowRelation
 
 
 -- !query
 SELECT v1
 -- !query analysis
-Project [variablereference(system.session.v1=1) AS v1#x]
+Project [variablereference(system.session.v1=CAST(NULL AS INT)) AS v1#x]
 +- OneRowRelation
 
 
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 INTERSECT SELECT 2 INTERSECT SELECT 3' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS INTERSECT#x]
-   +- OneRowRelation
+SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))]
++- Intersect false
+   :- Intersect false
+   :  :- Project [1 AS 1#x]
+   :  :  +- OneRowRelation
+   :  +- Project [2 AS 2#x]
+   :     +- OneRowRelation
+   +- Project [3 AS 3#x]
+      +- OneRowRelation
 
 
 -- !query
 SELECT v1
 -- !query analysis
-Project [variablereference(system.session.v1=1) AS v1#x]
+Project [variablereference(system.session.v1=CAST(NULL AS INT)) AS v1#x]
 +- OneRowRelation
 
 
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 INTERSECT SELECT 1 INTERSECT SELECT 1' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS INTERSECT#x]
-   +- OneRowRelation
+SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))]
++- Intersect false
+   :- Intersect false
+   :  :- Project [1 AS 1#x]
+   :  :  +- OneRowRelation
+   :  +- Project [1 AS 1#x]
+   :     +- OneRowRelation
+   +- Project [1 AS 1#x]
+      +- OneRowRelation
 
 
 -- !query
@@ -1084,9 +1120,22 @@ Project [variablereference(system.session.v1=1) AS v1#x]
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS JOIN#x]
-   +- OneRowRelation
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'SELECT'",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 50,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1"
+  } ]
+}
 
 
 -- !query
@@ -1099,9 +1148,22 @@ Project [variablereference(system.session.v1=1) AS v1#x]
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS VALUES#x]
-   +- OneRowRelation
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'('",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 47,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1"
+  } ]
+}
 
 
 -- !query
@@ -1114,9 +1176,22 @@ Project [variablereference(system.session.v1=1) AS v1#x]
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS alias#x]
-   +- OneRowRelation
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'garbage'",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 66,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' 
INTO v1"
+  } ]
+}
 
 
 -- !query
@@ -1129,9 +1204,22 @@ Project [variablereference(system.session.v1=1) AS v1#x]
 -- !query
 EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1
 -- !query analysis
-SetVariable [variablereference(system.session.v1=1)]
-+- Project [1 AS WITH#x]
-   +- OneRowRelation
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'abc'",
+    "hint" : ": extra input 'abc'"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 45,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql 
b/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql
index f7d27c6c0b03..3eaacbd37abc 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql
@@ -87,6 +87,10 @@ EXECUTE IMMEDIATE 'SELECT \'invalid_cast_error_expected\'' 
INTO res_id;
 -- require query when using INTO
 EXECUTE IMMEDIATE 'INSERT INTO x VALUES (?)' INTO res_id USING 1;
 
+-- require query when using INTO with SET VAR command
+DECLARE OR REPLACE testvarA INT;
+EXECUTE IMMEDIATE 'SET VAR testVarA = 1' INTO testVarA;
+
 -- use column in using - should fail as we expect variable here
 EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id;
 
@@ -109,8 +113,10 @@ EXECUTE IMMEDIATE b;
 SET VAR sql_string = 'SELECT * from tbl_view where name = :first or id = 
:second';
 SET VAR a = 'na';
 
--- expressions not supported - feature not supported
+-- constant expressions are supported
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING CONCAT(a 
, "me1") as first;
+
+-- subquery in using not supported
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING (SELECT 
42) as first, 'name2' as second;
 
 -- INTO variables not matching scalar types
@@ -140,10 +146,41 @@ EXECUTE IMMEDIATE 'SELECT id FROM tbl_view WHERE id = :p' 
USING p, 'p';
 EXECUTE IMMEDIATE 'SELECT id, data.f1 FROM tbl_view WHERE id = 10' INTO 
res_id, res_id;
 
 -- nested execute immediate
-EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ? 
USING 10\'';
+EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ?\' 
USING 10';
 
 -- sqlString is null
 SET VAR sql_string = null;
 EXECUTE IMMEDIATE sql_string;
 
-DROP TABLE x;
\ No newline at end of file
+-- sqlString is not a string
+SET VAR sql_string = 5;
+EXECUTE IMMEDIATE sql_string;
+
+-- sqlString is not a well formed SQL statement.
+SET VAR sql_string = 'hello';
+EXECUTE IMMEDIATE length(sql_string);
+
+-- mixed positional and named parameters in query
+EXECUTE IMMEDIATE 'SELECT 42 where ? = :first' USING 1, 2 as first;
+
+-- non-string variable as sqlString parameter
+DECLARE int_var INT;
+SET VAR int_var = 42;
+EXECUTE IMMEDIATE int_var;
+
+-- null string as sqlString parameter
+DECLARE null_var STRING;
+SET VAR null_var = null;
+EXECUTE IMMEDIATE null_var;
+
+-- unsupported expression for parameter (subquery)
+EXECUTE IMMEDIATE 'SELECT ?' USING (SELECT 1);
+
+-- named query with unnamed parameters
+EXECUTE IMMEDIATE 'SELECT :first' USING 2, 3;
+
+-- Query is not a constant
+EXECUTE IMMEDIATE (SELECT c FROM (VALUES(1)) AS T(c));
+
+DROP TABLE x;
+
diff --git 
a/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out 
b/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out
index 9249d7eb3e51..8b44bef68c16 100644
--- a/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out
@@ -423,17 +423,39 @@ org.apache.spark.sql.AnalysisException
 
 
 -- !query
-EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id
+DECLARE OR REPLACE testvarA INT
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+EXECUTE IMMEDIATE 'SET VAR testVarA = 1' INTO testVarA
 -- !query schema
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "UNRESOLVED_VARIABLE",
-  "sqlState" : "42883",
+  "errorClass" : "INVALID_STATEMENT_FOR_EXECUTE_INTO",
+  "sqlState" : "07501",
+  "messageParameters" : {
+    "sqlString" : "SET VAR TESTVARA = 1"
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+  "sqlState" : "42703",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
-    "variableName" : "`id`"
+    "objectName" : "`id`"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -525,10 +547,10 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
   "sqlState" : "42K09",
   "messageParameters" : {
-    "varType" : "\"INT\""
+    "exprType" : "\"INT\""
   }
 }
 
@@ -552,23 +574,9 @@ struct<>
 -- !query
 EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING CONCAT(a 
, "me1") as first
 -- !query schema
-struct<>
+struct<id:int,name:string,data:struct<f1:int,s2:struct<f2:int,f3:string>>>
 -- !query output
-org.apache.spark.sql.AnalysisException
-{
-  "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER",
-  "sqlState" : "42K0E",
-  "messageParameters" : {
-    "invalidExprSql" : "\"CONCAT(a, me1)\""
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 70,
-    "stopIndex" : 86,
-    "fragment" : "CONCAT(a , \"me1\")"
-  } ]
-}
+10     name1   {"f1":1,"s2":{"f2":101,"f3":"a"}}
 
 
 -- !query
@@ -747,22 +755,38 @@ org.apache.spark.sql.AnalysisException
 
 
 -- !query
-EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ? 
USING 10\''
+EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ?\' 
USING 10'
+-- !query schema
+struct<id:int>
+-- !query output
+10
+
+
+-- !query
+SET VAR sql_string = null
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+EXECUTE IMMEDIATE sql_string
 -- !query schema
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "NESTED_EXECUTE_IMMEDIATE",
-  "sqlState" : "07501",
+  "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE",
+  "sqlState" : "22004",
   "messageParameters" : {
-    "sqlString" : "EXECUTE IMMEDIATE 'SELECT ID FROM TBL_VIEW WHERE ID = ? 
USING 10'"
+    "varName" : "`sql_string`"
   }
 }
 
 
 -- !query
-SET VAR sql_string = null
+SET VAR sql_string = 5
 -- !query schema
 struct<>
 -- !query output
@@ -774,12 +798,169 @@ EXECUTE IMMEDIATE sql_string
 -- !query schema
 struct<>
 -- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'5'",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 28,
+    "fragment" : "EXECUTE IMMEDIATE sql_string"
+  } ]
+}
+
+
+-- !query
+SET VAR sql_string = 'hello'
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+EXECUTE IMMEDIATE length(sql_string)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprType" : "\"INT\""
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT 42 where ? = :first' USING 1, 2 as first
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_QUERY_MIXED_QUERY_PARAMETERS",
+  "sqlState" : "42613"
+}
+
+
+-- !query
+DECLARE int_var INT
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET VAR int_var = 42
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+EXECUTE IMMEDIATE int_var
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprType" : "\"INT\""
+  }
+}
+
+
+-- !query
+DECLARE null_var STRING
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SET VAR null_var = null
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+EXECUTE IMMEDIATE null_var
+-- !query schema
+struct<>
+-- !query output
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE",
   "sqlState" : "22004",
   "messageParameters" : {
-    "varName" : "`sql_string`"
+    "varName" : "`null_var`"
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT ?' USING (SELECT 1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER",
+  "sqlState" : "42K0E",
+  "messageParameters" : {
+    "invalidExprSql" : "\"scalarsubquery()\""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 36,
+    "stopIndex" : 45,
+    "fragment" : "(SELECT 1)"
+  } ]
+}
+
+
+-- !query
+EXECUTE IMMEDIATE 'SELECT :first' USING 2, 3
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "ALL_PARAMETERS_MUST_BE_NAMED",
+  "sqlState" : "07001",
+  "messageParameters" : {
+    "exprs" : "\"2\", \"3\""
+  }
+}
+
+
+-- !query
+EXECUTE IMMEDIATE (SELECT c FROM (VALUES(1)) AS T(c))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+    "exprType" : "\"INT\""
   }
 }
 
diff --git 
a/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out
index 6c9751864324..9a523e9562f9 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out
@@ -614,7 +614,11 @@ EXECUTE IMMEDIATE 'SELECT 1 UNION SELECT 2 UNION SELECT 3' 
INTO v1
 -- !query schema
 struct<>
 -- !query output
-
+org.apache.spark.SparkException
+{
+  "errorClass" : "ROW_SUBQUERY_TOO_MANY_ROWS",
+  "sqlState" : "21000"
+}
 
 
 -- !query
@@ -622,7 +626,7 @@ SELECT v1
 -- !query schema
 struct<v1:int>
 -- !query output
-1
+NULL
 
 
 -- !query
@@ -670,7 +674,7 @@ SELECT v1
 -- !query schema
 struct<v1:int>
 -- !query output
-1
+NULL
 
 
 -- !query
@@ -686,7 +690,7 @@ SELECT v1
 -- !query schema
 struct<v1:int>
 -- !query output
-1
+NULL
 
 
 -- !query
@@ -710,7 +714,22 @@ EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1
 -- !query schema
 struct<>
 -- !query output
-
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'SELECT'",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 50,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1"
+  } ]
+}
 
 
 -- !query
@@ -726,7 +745,22 @@ EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1
 -- !query schema
 struct<>
 -- !query output
-
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'('",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 47,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1"
+  } ]
+}
 
 
 -- !query
@@ -742,7 +776,22 @@ EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' 
INTO v1
 -- !query schema
 struct<>
 -- !query output
-
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'garbage'",
+    "hint" : ""
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 66,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' 
INTO v1"
+  } ]
+}
 
 
 -- !query
@@ -758,7 +807,22 @@ EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1
 -- !query schema
 struct<>
 -- !query output
-
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+    "error" : "'abc'",
+    "hint" : ": extra input 'abc'"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 45,
+    "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1"
+  } ]
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
index 666f85e19c1c..629d85f19b0a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.errors
 
 import org.apache.spark.SparkThrowable
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{AnalysisException, QueryTest}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
@@ -62,16 +62,16 @@ class QueryParsingErrorsSuite extends QueryTest with 
SharedSparkSession with SQL
     )
   }
 
-  test("PARSE_SYNTAX_ERROR: Execute immediate syntax error with INTO 
specified") {
+  test("UNRESOLVED_COLUMN: Execute immediate with unresolved column in INTO 
clause") {
     val query = "EXECUTE IMMEDIATE 'SELCT 1707 WHERE ? = 1' INTO a USING 1"
     checkError(
-      exception = parseException(query),
-      condition = "PARSE_SYNTAX_ERROR",
-      parameters = Map("error" -> "'SELCT'", "hint" -> ""),
+      exception = intercept[AnalysisException](sql(query)),
+      condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+      parameters = Map("objectName" -> "`a`"),
       context = ExpectedContext(
-        start = 0,
-        stop = 56,
-        fragment = query)
+        start = 48,
+        stop = 48,
+        fragment = "a")
     )
   }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index f9bebce7cbfa..aa801b6e2f68 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
 import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDF, GenericUDTF}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, 
ResolveDataSource, ResolveSessionCatalog, ResolveTranspose}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, 
ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, 
ResolveTranspose}
 import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension
 import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, 
InvalidUDFClassException}
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExtractSemiStructuredFields}
@@ -133,6 +133,7 @@ class HiveSessionStateBuilder(
         new DetermineTableStats(session) +:
         new ResolveTranspose(session) +:
         new InvokeProcedures(session) +:
+        ResolveExecuteImmediate(session, catalogManager) +:
         ExtractSemiStructuredFields +:
         customResolutionRules
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to