This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f92601ce879b [SPARK-53444][SQL] Rework execute immediate f92601ce879b is described below commit f92601ce879b1f2220387d2f720b533d5ff08e21 Author: Serge Rielau <se...@rielau.com> AuthorDate: Sat Sep 13 21:32:48 2025 +0800 [SPARK-53444][SQL] Rework execute immediate ### What changes were proposed in this pull request? This PR significantly reworks the existing `EXECUTE IMMEDIATE` SQL statement implementation in to improve separation of concerns, robustness, and functionality: 1. **Architecture Redesign**: - Refactored from direct execution to a nested invocation model using `sparkSession.sql()` - Improved separation between analysis-time validation and execution-time processing - Enhanced isolation between outer and inner query contexts 2. **Enhanced Query String Support**: - **NEW**: Support for constant expressions as SQL statement arguments (previously only string literals) 3. **Improved Parameter Handling**: - Redesigned parameter processing with unified parameter arrays - Enhanced validation for mixing positional and named parameters - Better error messages with `UNSUPPORTED_EXPR_FOR_PARAMETER` for invalid expressions 4. **Robust SQL Scripting Integration**: - **NEW**: Complete isolation of local variables from nested `EXECUTE IMMEDIATE` calls - Preserved session variable functionality within `EXECUTE IMMEDIATE` - **NEW**: Proper command vs. query result distinction (commands don't produce result sets) 5. **Enhanced Error Handling**: - Removed unnecessary exception wrapping for cleaner error propagation - **NEW**: Multi-level context error reporting showing both outer and inner query context - Comprehensive validation with appropriate error classes 6. **Code Quality Improvements**: - Eliminated code duplication in parameter validation logic - Consolidated imports and cleaned up unused code - Improved method naming and documentation Open issues: The code testing whether a query uses named or positional parameter markers is not robust yet and needs more work. ### Why are the changes needed? The existing implementation has been brittle and could not handle transitive closure (e.g. nested EXECUTE IMMEDIATE and SQL Scripting in EXECUTE IMMEDIATE. There is also a follow on PR dealing with improved parameter substitution which is being blocked from supporting EXECUTE IMMEDIATE until this PR is delivered. Lastly with this change we lay the foundation to support nesting/chaining of errors contexts to SQL level stacks. ### Does this PR introduce _any_ user-facing change? **Enhanced Functionality (Backward Compatible):** ```sql -- NEW: Expression concatenation EXECUTE IMMEDIATE 'SELECT * FROM ' || table_name || ' WHERE active = true'; -- EXISTING: All previous syntax continues to work EXECUTE IMMEDIATE 'SELECT * FROM table_name'; EXECUTE IMMEDIATE 'SELECT * FROM table_name WHERE id = ?' USING 123; EXECUTE IMMEDIATE 'SELECT name FROM users WHERE id = ?' INTO user_name USING 123; ``` A couple of error conditions now return more general codes due to improved orthogonality. All existing valid `EXECUTE IMMEDIATE` statements continue to work as before. ### How was this patch tested? 1. **Regression Testing**: - All existing `EXECUTE IMMEDIATE` tests continue to pass - Verified backward compatibility with existing syntax and behavior 2. **Enhanced Test Coverage**: - **NEW**: Tests for variable references and expression concatenation in query strings - **NEW**: Tests for proper local variable isolation in SQL scripting contexts - **NEW**: Tests for command vs. query result handling - **EXPANDED**: Error condition testing with new validation logic 3. **Integration Testing**: - `SqlScriptingExecutionSuite` tests for SQL scripting integration - Golden file tests (`execute-immediate.sql`) for analyzer and execution results - Cross-feature testing with variables, parameters, and nested contexts 4. **Error Handling Validation**: - Comprehensive testing of all error classes and messages - Validation that exceptions propagate naturally without wrapping - Multi-level error context testing The reworked implementation passes all existing tests plus extensive new test coverage, ensuring both backward compatibility and enhanced functionality. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude 3.5 Sonnet (Anthropic) - Used for code review, refactoring suggestions, and implementation guidance. All core architectural decisions, SQL semantics, and implementation logic were designed and developed by human developers. Closes #52173 from srielau/rework-execute-immediate. Lead-authored-by: Serge Rielau <se...@rielau.com> Co-authored-by: Serge Rielau <srie...@users.noreply.github.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/resources/error/error-conditions.json | 12 +- .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 15 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 18 -- .../catalyst/analysis/ColumnResolutionHelper.scala | 1 - .../sql/catalyst/analysis/ResolveCatalogs.scala | 3 +- .../sql/catalyst/analysis/ResolveSetVariable.scala | 36 ++- .../sql/catalyst/analysis/VariableResolution.scala | 22 +- .../sql/catalyst/analysis/executeImmediate.scala | 218 ---------------- .../spark/sql/catalyst/analysis/parameters.scala | 87 ++++++- .../spark/sql/catalyst/analysis/unresolved.scala | 18 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 13 +- .../sql/catalyst/rules/RuleIdCollection.scala | 2 +- .../spark/sql/errors/QueryCompilationErrors.scala | 12 +- .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 78 ------ .../sql/catalyst/analysis/AnalysisSuite.scala | 32 +-- .../analysis/ResolveExecuteImmediate.scala | 208 +++++++++++++++ .../apache/spark/sql/classic/SparkSession.scala | 43 ++- .../spark/sql/execution/command/SetCommand.scala | 5 +- .../sql/internal/BaseSessionStateBuilder.scala | 3 +- .../analyzer-results/execute-immediate.sql.out | 289 +++++++++++++++++---- .../parse-query-correctness-old-behavior.sql.out | 150 ++++++++--- .../sql-tests/inputs/execute-immediate.sql | 43 ++- .../sql-tests/results/execute-immediate.sql.out | 239 ++++++++++++++--- .../parse-query-correctness-old-behavior.sql.out | 80 +++++- .../spark/sql/errors/QueryParsingErrorsSuite.scala | 16 +- .../spark/sql/hive/HiveSessionStateBuilder.scala | 3 +- 26 files changed, 1086 insertions(+), 560 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 8bbf24074f35..06e6c2f14d86 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2742,6 +2742,12 @@ ], "sqlState" : "42001" }, + "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : { + "message" : [ + "Expression type must be string type but got <exprType>." + ], + "sqlState" : "42K09" + }, "INVALID_EXTERNAL_TYPE" : { "message" : [ "The external type <externalType> is not valid for the type <type> at the expression <expr>." @@ -3919,12 +3925,6 @@ }, "sqlState" : "42K0M" }, - "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE" : { - "message" : [ - "Variable type must be string type but got <varType>." - ], - "sqlState" : "42K09" - }, "INVALID_VARIANT_CAST" : { "message" : [ "The variant value `<value>` cannot be cast into `<dataType>`. Please use `try_variant_get` instead." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index e63a229a3207..8efab99d4ec8 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -397,7 +397,7 @@ setResetStatement ; executeImmediate - : EXECUTE IMMEDIATE queryParam=executeImmediateQueryParam (INTO targetVariable=multipartIdentifierList)? executeImmediateUsing? + : EXECUTE IMMEDIATE queryParam=expression (INTO targetVariable=multipartIdentifierList)? executeImmediateUsing? ; executeImmediateUsing @@ -405,19 +405,6 @@ executeImmediateUsing | USING params=namedExpressionSeq ; -executeImmediateQueryParam - : stringLit - | multipartIdentifier - ; - -executeImmediateArgument - : (constant|multipartIdentifier) (AS name=errorCapturingIdentifier)? - ; - -executeImmediateArgumentSeq - : executeImmediateArgument (COMMA executeImmediateArgument)* - ; - timezone : stringLit | LOCAL diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4a360fa1da7e..3cef19cbad53 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -135,9 +135,6 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo * if `t` was a permanent table when the current view was created, it * should still be a permanent table when resolving the current view, * even if a temp view `t` has been created. - * @param isExecuteImmediate Whether the current plan is created by EXECUTE IMMEDIATE. Used when - * resolving variables, as SQL Scripting local variables should not be - * visible from EXECUTE IMMEDIATE. * @param outerPlan The query plan from the outer query that can be used to resolve star * expressions in a subquery. */ @@ -155,7 +152,6 @@ case class AnalysisContext( referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty, referredTempVariableNames: Seq[Seq[String]] = Seq.empty, outerPlan: Option[LogicalPlan] = None, - isExecuteImmediate: Boolean = false, collation: Option[String] = None, /** @@ -212,19 +208,11 @@ object AnalysisContext { viewDesc.viewReferredTempViewNames, mutable.Set(viewDesc.viewReferredTempFunctionNames: _*), viewDesc.viewReferredTempVariableNames, - isExecuteImmediate = originContext.isExecuteImmediate, collation = viewDesc.collation) set(context) try f finally { set(originContext) } } - def withExecuteImmediateContext[A](f: => A): A = { - val originContext = value.get() - val context = originContext.copy(isExecuteImmediate = true) - - set(context) - try f finally { set(originContext) } - } def withNewAnalysisContext[A](f: => A): A = { val originContext = value.get() @@ -495,10 +483,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor RewriteMergeIntoTable :: MoveParameterizedQueriesDown :: BindParameters :: - new SubstituteExecuteImmediate( - catalogManager, - resolveChild = executeSameContext, - checkAnalysis = checkAnalysis) :: typeCoercionRules() ++ Seq( ResolveWithCTE, @@ -1802,8 +1786,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case s: Sort if !s.resolved || s.missingInput.nonEmpty => resolveReferencesInSort(s) - // Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]]. - case e : ExecuteImmediateQuery => e case d: DataFrameDropColumns if !d.resolved => resolveDataFrameDropColumns(d) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala index bade5f0bee9d..3224ccafafec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala @@ -241,7 +241,6 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { variableResolution.resolveMultipartName( nameParts = nameParts, resolvingView = AnalysisContext.get.catalogAndNamespace.nonEmpty, - resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate, referredTempVariableNames = AnalysisContext.get.referredTempVariableNames ).map(e => Alias(e, nameParts.last)()) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 851db598c4e3..6307ccd5b975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -142,8 +142,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) } private def withinSqlScript: Boolean = - SqlScriptingContextManager.get().map(_.getVariableManager).isDefined && - !AnalysisContext.get.isExecuteImmediate + SqlScriptingContextManager.get().map(_.getVariableManager).isDefined private def assertValidSessionVariableNameParts( nameParts: Seq[String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala index ab4408435767..4b16448641bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala @@ -35,6 +35,22 @@ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[Logica with ColumnResolutionHelper { private val variableResolution = new VariableResolution(catalogManager.tempVariableManager) + /** + * Checks for duplicate variable names and throws an exception if found. + * Names are normalized when the variables are created. + * No need for case insensitive comparison here. + */ + private def checkForDuplicateVariables(variables: Seq[VariableReference]): Unit = { + // TODO: we need to group by the qualified variable name once other catalogs support it. + val dups = variables.groupBy(_.identifier).filter(kv => kv._2.length > 1) + if (dups.nonEmpty) { + throw new AnalysisException( + errorClass = "DUPLICATE_ASSIGNMENTS", + messageParameters = Map("nameList" -> + dups.keys.map(key => toSQLId(key.name())).mkString(", "))) + } + } + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(COMMAND), ruleId) { // Resolve the left hand side of the SET VAR command @@ -42,8 +58,7 @@ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[Logica val resolvedVars = setVariable.targetVariables.map { case u: UnresolvedAttribute => variableResolution.lookupVariable( - nameParts = u.nameParts, - resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate + nameParts = u.nameParts ) match { case Some(variable) => variable.copy(canFold = false) case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION")) @@ -53,24 +68,17 @@ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[Logica "Unexpected target variable expression in SetVariable: " + other) } - // Protect against duplicate variable names - // Names are normalized when the variables are created. - // No need for case insensitive comparison here. - // TODO: we need to group by the qualified variable name once other catalogs support it. - val dups = resolvedVars.groupBy(_.identifier).filter(kv => kv._2.length > 1) - if (dups.nonEmpty) { - throw new AnalysisException( - errorClass = "DUPLICATE_ASSIGNMENTS", - messageParameters = Map("nameList" -> - dups.keys.map(key => toSQLId(key.name())).mkString(", "))) - } - setVariable.copy(targetVariables = resolvedVars) case setVariable: SetVariable if setVariable.targetVariables.forall(_.isInstanceOf[VariableReference]) && setVariable.sourceQuery.resolved => val targetVariables = setVariable.targetVariables.map(_.asInstanceOf[VariableReference]) + + // Check for duplicate variable names - this handles both regular SET VAR (after resolution) + // and EXECUTE IMMEDIATE ... INTO (which comes pre-resolved) + checkForDuplicateVariables(targetVariables) + val withCasts = TableOutputResolver.resolveVariableOutputColumns( targetVariables, setVariable.sourceQuery, conf) val withLimit = if (withCasts.maxRows.exists(_ <= 2)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala index 72af7c619a08..2a1f35b0859d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala @@ -52,9 +52,6 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo * (e.g., ["catalog", "schema", "variable", "field1", "field2"]) * @param resolvingView Whether this resolution is happening within a view context. * When true, only variables explicitly referred to in the view definition are accessible. - * @param resolvingExecuteImmediate Whether this resolution is happening within an - * EXECUTE IMMEDIATE context. When true, local variables are not accessible, only session - * variables. * @param referredTempVariableNames When resolving within a view, this contains the list of * variable names that the view explicitly references and should have access to. * @@ -65,7 +62,6 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo def resolveMultipartName( nameParts: Seq[String], resolvingView: Boolean, - resolvingExecuteImmediate: Boolean, referredTempVariableNames: Seq[Seq[String]]): Option[Expression] = { var resolvedVariable: Option[Expression] = None // We only support temp variables for now, so the variable name can at most have 3 parts. @@ -76,7 +72,6 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo resolvedVariable = resolveVariable( nameParts = nameParts.dropRight(numInnerFields), resolvingView = resolvingView, - resolvingExecuteImmediate = resolvingExecuteImmediate, referredTempVariableNames = referredTempVariableNames ) @@ -99,16 +94,12 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo /** * Look up variable by nameParts. - * If in SQL Script, first check local variables, unless in EXECUTE IMMEDIATE - * (EXECUTE IMMEDIATE generated query cannot access local variables). - * if not found fall back to session variables. + * If in SQL Script, first check local variables. + * If not found fall back to session variables. * @param nameParts NameParts of the variable. - * @param resolvingExecuteImmediate Whether the current context is in EXECUTE IMMEDIATE. * @return Reference to the variable. */ - def lookupVariable( - nameParts: Seq[String], - resolvingExecuteImmediate: Boolean): Option[VariableReference] = { + def lookupVariable(nameParts: Seq[String]): Option[VariableReference] = { val namePartsCaseAdjusted = if (conf.caseSensitiveAnalysis) { nameParts } else { @@ -118,8 +109,6 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo SqlScriptingContextManager .get() .map(_.getVariableManager) - // If we are in EXECUTE IMMEDIATE lookup only session variables. - .filterNot(_ => resolvingExecuteImmediate) // If variable name is qualified with session.<varName> treat it as a session variable. .filterNot( _ => @@ -156,16 +145,15 @@ class VariableResolution(tempVariableManager: TempVariableManager) extends SQLCo private def resolveVariable( nameParts: Seq[String], resolvingView: Boolean, - resolvingExecuteImmediate: Boolean, referredTempVariableNames: Seq[Seq[String]]): Option[Expression] = { if (resolvingView) { if (referredTempVariableNames.contains(nameParts)) { - lookupVariable(nameParts = nameParts, resolvingExecuteImmediate = resolvingExecuteImmediate) + lookupVariable(nameParts = nameParts) } else { None } } else { - lookupVariable(nameParts = nameParts, resolvingExecuteImmediate = resolvingExecuteImmediate) + lookupVariable(nameParts = nameParts) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala deleted file mode 100644 index b926cdf57f16..000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import scala.util.{Either, Left, Right} - -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, VariableReference} -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{CompoundBody, LogicalPlan, SetVariable} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{EXECUTE_IMMEDIATE, TreePattern} -import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.types.StringType - -/** - * Logical plan representing execute immediate query. - * - * @param args parameters of query - * @param query query string or variable - * @param targetVariables variables to store the result of the query - */ -case class ExecuteImmediateQuery( - args: Seq[Expression], - query: Either[String, UnresolvedAttribute], - targetVariables: Seq[UnresolvedAttribute]) - extends UnresolvedLeafNode { - final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE) -} - -/** - * This rule substitutes execute immediate query node with fully analyzed - * plan that is passed as string literal or session parameter. - */ -class SubstituteExecuteImmediate( - val catalogManager: CatalogManager, - resolveChild: LogicalPlan => LogicalPlan, - checkAnalysis: LogicalPlan => Unit) - extends Rule[LogicalPlan] { - private val variableResolution = new VariableResolution(catalogManager.tempVariableManager) - - def resolveVariable(e: Expression): Expression = { - - /** - * We know that the expression is either UnresolvedAttribute, Alias or Parameter, as passed from - * the parser. If it is an UnresolvedAttribute, we look it up in the catalog and return it. If - * it is an Alias, we resolve the child and return an Alias with the same name. If it is - * a Parameter, we leave it as is because the parameter belongs to another parameterized - * query and should be resolved later. - */ - e match { - case u: UnresolvedAttribute => - getVariableReference(u, u.nameParts) - case a: Alias => - Alias(resolveVariable(a.child), a.name)() - case p: Parameter => p - case other => - throw QueryCompilationErrors.unsupportedParameterExpression(other) - } - } - - def resolveArguments(expressions: Seq[Expression]): Seq[Expression] = { - expressions.map { exp => - if (exp.resolved) { - exp - } else { - resolveVariable(exp) - } - } - } - - def extractQueryString(either: Either[String, UnresolvedAttribute]): String = { - either match { - case Left(v) => v - case Right(u) => - val varReference = getVariableReference(u, u.nameParts) - - if (!varReference.dataType.sameType(StringType)) { - throw QueryCompilationErrors.invalidExecuteImmediateVariableType(varReference.dataType) - } - - // Call eval with null value passed instead of a row. - // This is ok as this is variable and invoking eval should - // be independent of row value. - val varReferenceValue = varReference.eval(null) - - if (varReferenceValue == null) { - throw QueryCompilationErrors.nullSQLStringExecuteImmediate(u.name) - } - - varReferenceValue.toString - } - } - - override def apply(plan: LogicalPlan): LogicalPlan = - plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) { - case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) => - e.copy(args = resolveArguments(expressions)) - - case ExecuteImmediateQuery(expressions, query, targetVariables) - if expressions.forall(_.resolved) => - - val queryString = extractQueryString(query) - val plan = parseStatement(queryString, targetVariables) - - val posNodes = plan.collect { case p: LogicalPlan => - p.expressions.flatMap(_.collect { case n: PosParameter => n }) - }.flatten - val namedNodes = plan.collect { case p: LogicalPlan => - p.expressions.flatMap(_.collect { case n: NamedParameter => n }) - }.flatten - - val queryPlan = if (expressions.isEmpty || (posNodes.isEmpty && namedNodes.isEmpty)) { - plan - } else if (posNodes.nonEmpty && namedNodes.nonEmpty) { - throw QueryCompilationErrors.invalidQueryMixedQueryParameters() - } else { - if (posNodes.nonEmpty) { - PosParameterizedQuery(plan, expressions) - } else { - val aliases = expressions.collect { - case e: Alias => e - case u: VariableReference => Alias(u, u.identifier.name())() - } - - if (aliases.size != expressions.size) { - val nonAliases = expressions.filter(attr => - !attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference]) - - throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases) - } - - NameParameterizedQuery( - plan, - aliases.map(_.name), - // We need to resolve arguments before Resolution batch to make sure - // that some rule does not accidentally resolve our parameters. - // We do not want this as they can resolve some unsupported parameters. - aliases) - } - } - - // Fully analyze the generated plan. AnalysisContext.withExecuteImmediateContext makes sure - // that SQL scripting local variables will not be accessed from the plan. - val finalPlan = AnalysisContext.withExecuteImmediateContext { - resolveChild(queryPlan) - } - checkAnalysis(finalPlan) - - if (targetVariables.nonEmpty) { - SetVariable(targetVariables, finalPlan) - } else { finalPlan } - } - - private def parseStatement( - queryString: String, - targetVariables: Seq[Expression]): LogicalPlan = { - // If targetVariables is defined, statement needs to be a query. - // Otherwise, it can be anything. - val plan = if (targetVariables.nonEmpty) { - try { - catalogManager.v1SessionCatalog.parser.parseQuery(queryString) - } catch { - case e: ParseException => - // Since we do not have a way of telling that parseQuery failed because of - // actual parsing error or because statement was passed where query was expected, - // we need to make sure that parsePlan wouldn't throw - catalogManager.v1SessionCatalog.parser.parsePlan(queryString) - - // Plan was successfully parsed, but query wasn't - throw. - throw QueryCompilationErrors.invalidStatementForExecuteInto(queryString) - } - } else { - catalogManager.v1SessionCatalog.parser.parsePlan(queryString) - } - - if (plan.isInstanceOf[CompoundBody]) { - throw QueryCompilationErrors.sqlScriptInExecuteImmediate(queryString) - } - - // do not allow nested execute immediate - if (plan.containsPattern(EXECUTE_IMMEDIATE)) { - throw QueryCompilationErrors.nestedExecuteImmediate(queryString) - } - - plan - } - - private def getVariableReference(expr: Expression, nameParts: Seq[String]): VariableReference = { - variableResolution.lookupVariable( - nameParts = nameParts, - resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate - ) match { - case Some(variable) => variable - case _ => - throw QueryCompilationErrors - .unresolvedVariableError( - nameParts, - Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE), - expr.origin) - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala index 2cfc2a8c90dc..b3200be95f69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, CreateArray, CreateMap, import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SupervisingCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMAND, PARAMETER, PARAMETERIZED_QUERY, TreePattern, UNRESOLVED_WITH} -import org.apache.spark.sql.errors.QueryErrorsBase +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.types.DataType sealed trait Parameter extends LeafExpression with Unevaluable { @@ -104,6 +104,31 @@ case class PosParameterizedQuery(child: LogicalPlan, args: Seq[Expression]) copy(child = newChild) } +/** + * The logical plan representing a parameterized query with general parameter support. + * This allows the query to use either positional or named parameters based on the + * parameter markers found in the query, with optional parameter names provided. + * + * @param child The parameterized logical plan. + * @param args The literal values or collection constructor functions such as `map()`, + * `array()`, `struct()` of parameters. + * @param paramNames Optional parameter names corresponding to args. If provided for an argument, + * that argument can be used for named parameter binding. If not provided + * parameters are treated as positional. + */ +case class GeneralParameterizedQuery( + child: LogicalPlan, + args: Seq[Expression], + paramNames: Seq[String]) + extends ParameterizedQuery(child) { + assert(args.nonEmpty) + assert(paramNames.length == args.length, + s"paramNames must be same length as args. " + + s"paramNames.length=${paramNames.length}, args.length=${args.length}") + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = + copy(child = newChild) +} + /** * Moves `ParameterizedQuery` inside `SupervisingCommand` for their supervised plans to be * resolved later by the analyzer. @@ -143,7 +168,7 @@ object MoveParameterizedQueriesDown extends Rule[LogicalPlan] { } /** - * Finds all named parameters in `ParameterizedQuery` and substitutes them by literals or + * Binds all named parameters in `ParameterizedQuery` and substitutes them by literals or * by collection constructor functions such as `map()`, `array()`, `struct()` * from the user-specified arguments. */ @@ -191,6 +216,7 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { case PosParameterizedQuery(child, args) if !child.containsPattern(UNRESOLVED_WITH) && args.forall(_.resolved) => + val indexedArgs = args.zipWithIndex checkArgs(indexedArgs.map(arg => (s"_${arg._2}", arg._1))) @@ -203,6 +229,63 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase { args(posToIndex(pos)) } + case GeneralParameterizedQuery(child, args, paramNames) + if !child.containsPattern(UNRESOLVED_WITH) && + args.forall(_.resolved) => + + // Check all arguments for validity (args are already evaluated expressions/literals) + val allArgs = args.zip(paramNames).zipWithIndex.map { case ((arg, name), index) => + val finalName = if (name.isEmpty) s"_$index" else name + finalName -> arg + } + checkArgs(allArgs) + + // Collect parameter types used in the query to enforce invariants + var hasNamedParam = false + val positionalParams = scala.collection.mutable.Set.empty[Int] + bind(child) { + case p @ NamedParameter(_) => hasNamedParam = true; p + case p @ PosParameter(pos) => positionalParams.add(pos); p + } + + // Validate: no mixing of positional and named parameters + if (hasNamedParam && positionalParams.nonEmpty) { + throw QueryCompilationErrors.invalidQueryMixedQueryParameters() + } + + // Validate: if query uses named parameters, all USING expressions must have names + if (hasNamedParam && positionalParams.isEmpty) { + if (paramNames.isEmpty) { + // Query uses named parameters but no USING expressions provided + throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(Seq.empty) + } else { + // Check that all USING expressions have names + val unnamedExpressions = paramNames.zipWithIndex.collect { + case ("", index) => index // empty strings are unnamed + } + if (unnamedExpressions.nonEmpty) { + val unnamedExprs = unnamedExpressions.map(args(_)) + throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(unnamedExprs) + } + } + } + + // Now we can do simple binding based on which type we determined + if (hasNamedParam) { + // Named parameter binding - paramNames guaranteed to have no nulls at this point + val namedArgsMap = paramNames.zip(args).toMap + bind(child) { + case NamedParameter(name) => namedArgsMap.getOrElse(name, NamedParameter(name)) + } + } else { + // Positional parameter binding (same logic as PosParameterizedQuery) + val posToIndex = positionalParams.toSeq.sorted.zipWithIndex.toMap + bind(child) { + case PosParameter(pos) if posToIndex.contains(pos) && args.size > posToIndex(pos) => + args(posToIndex(pos)) + } + } + case other => other } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 2e0dbe9b7f42..b759c70266f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, SupportsSubquery, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId @@ -1212,3 +1212,19 @@ trait UnresolvedPlanId extends LeafExpression with Unevaluable { // Subclasses can override this function to provide more TreePatterns. def nodePatternsInternal(): Seq[TreePattern] = Seq() } + +/** + * Logical plan representing execute immediate query. + * + * @param sqlStmtStr the query expression (first child) + * @param args parameters from USING clause (subsequent children) + * @param targetVariables variables to store the result of the query + */ +case class UnresolvedExecuteImmediate( + sqlStmtStr: Expression, + args: Seq[Expression], + targetVariables: Seq[Expression]) + extends UnresolvedLeafNode with SupportsSubquery { + + final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 4e930280381c..b42cba86e0fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Set} import scala.jdk.CollectionConverters._ -import scala.util.{Left, Right} import org.antlr.v4.runtime.{ParserRuleContext, RuleContext, Token} import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} @@ -1149,7 +1148,7 @@ class AstBuilder extends DataTypeAstBuilder } /** - * Returns the parameters for [[ExecuteImmediateQuery]] logical plan. + * Returns the parameters for [[UnresolvedExecuteImmediate]] logical plan. * Expected format: * {{{ * EXECUTE IMMEDIATE {query_string|string_literal} @@ -1157,11 +1156,8 @@ class AstBuilder extends DataTypeAstBuilder * }}} */ override def visitExecuteImmediate(ctx: ExecuteImmediateContext): LogicalPlan = withOrigin(ctx) { - // Because of how parsing rules are written, we know that either - // queryParam or targetVariable is non null - hence use Either to represent this. - val queryString = Option(ctx.queryParam.stringLit()).map(sl => Left(string(visitStringLit(sl)))) - val queryVariable = Option(ctx.queryParam.multipartIdentifier) - .map(mpi => Right(UnresolvedAttribute(visitMultipartIdentifier(mpi)))) + // With the new grammar, queryParam is now an expression + val queryParam = expression(ctx.queryParam) val targetVars = Option(ctx.targetVariable).toSeq .flatMap(v => visitMultipartIdentifierList(v)) @@ -1169,8 +1165,7 @@ class AstBuilder extends DataTypeAstBuilder visitExecuteImmediateUsing(_) }.getOrElse{ Seq.empty } - - ExecuteImmediateQuery(exprs, queryString.getOrElse(queryVariable.get), targetVars) + UnresolvedExecuteImmediate(queryParam, exprs, targetVars) } override def visitExecuteImmediateUsing( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index e7b59af5e776..c68b8a2c29af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -97,7 +97,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.analysis.ResolveOrderByAll" :: "org.apache.spark.sql.catalyst.analysis.ResolveRowLevelCommandAssignments" :: "org.apache.spark.sql.catalyst.analysis.ResolveSetVariable" :: - "org.apache.spark.sql.catalyst.analysis.SubstituteExecuteImmediate" :: + "org.apache.spark.sql.catalyst.analysis.ResolveExecuteImmediate" :: "org.apache.spark.sql.catalyst.analysis.ResolveTableSpec" :: "org.apache.spark.sql.catalyst.analysis.ResolveTimeZone" :: "org.apache.spark.sql.catalyst.analysis.ResolveUnion" :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 74eed741622f..d1c81990f4f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -4131,10 +4131,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map.empty) } - def invalidExecuteImmediateVariableType(dataType: DataType): Throwable = { + def invalidExecuteImmediateExpressionType(dataType: DataType): Throwable = { throw new AnalysisException( - errorClass = "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", - messageParameters = Map("varType" -> toSQLType(dataType))) + errorClass = "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + messageParameters = Map("exprType" -> toSQLType(dataType))) } def nullSQLStringExecuteImmediate(varName: String): Throwable = { @@ -4149,12 +4149,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("sqlString" -> toSQLStmt(queryString))) } - def nestedExecuteImmediate(queryString: String): Throwable = { - throw new AnalysisException( - errorClass = "NESTED_EXECUTE_IMMEDIATE", - messageParameters = Map("sqlString" -> toSQLStmt(queryString))) - } - def sqlScriptInExecuteImmediate(sqlScriptString: String): Throwable = { throw new AnalysisException( errorClass = "SQL_SCRIPT_IN_EXECUTE_IMMEDIATE", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index a301f77cf0c0..94f650bc35c7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -834,88 +834,10 @@ class AnalysisErrorSuite extends AnalysisTest with DataTypeErrorsBase { """"explode(array(min(a)))", "explode(array(max(a)))"""" :: Nil ) - errorConditionTest( - "EXEC IMMEDIATE - nested execute immediate not allowed", - CatalystSqlParser.parsePlan("EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \\\'SELECT 42\\\''"), - "NESTED_EXECUTE_IMMEDIATE", - Map( - "sqlString" -> "EXECUTE IMMEDIATE 'SELECT 42'")) - - errorConditionTest( - "EXEC IMMEDIATE - both positional and named used", - CatalystSqlParser.parsePlan("EXECUTE IMMEDIATE 'SELECT 42 where ? = :first'" + - " USING 1, 2 as first"), - "INVALID_QUERY_MIXED_QUERY_PARAMETERS", - Map.empty) - - test("EXEC IMMEDIATE - non string variable as sqlString parameter") { - val execImmediatePlan = ExecuteImmediateQuery( - Seq.empty, - scala.util.Right(UnresolvedAttribute("testVarA")), - Seq(UnresolvedAttribute("testVarA"))) - - assertAnalysisErrorCondition( - inputPlan = execImmediatePlan, - expectedErrorCondition = "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", - expectedMessageParameters = Map( - "varType" -> "\"INT\"" - )) - } - - test("EXEC IMMEDIATE - Null string as sqlString parameter") { - val execImmediatePlan = ExecuteImmediateQuery( - Seq.empty, - scala.util.Right(UnresolvedAttribute("testVarNull")), - Seq(UnresolvedAttribute("testVarNull"))) - - assertAnalysisErrorCondition( - inputPlan = execImmediatePlan, - expectedErrorCondition = "NULL_QUERY_STRING_EXECUTE_IMMEDIATE", - expectedMessageParameters = Map("varName" -> "`testVarNull`")) - } - test("EXEC IMMEDIATE - Unsupported expr for parameter") { - val execImmediatePlan: LogicalPlan = ExecuteImmediateQuery( - Seq(UnresolvedAttribute("testVarA"), NaNvl(Literal(1), Literal(1))), - scala.util.Left("SELECT ?"), - Seq.empty) - assertAnalysisErrorCondition( - inputPlan = execImmediatePlan, - expectedErrorCondition = "UNSUPPORTED_EXPR_FOR_PARAMETER", - expectedMessageParameters = Map( - "invalidExprSql" -> "\"nanvl(1, 1)\"" - )) - } - - test("EXEC IMMEDIATE - Name Parametrize query with non named parameters") { - val execImmediateSetVariablePlan = ExecuteImmediateQuery( - Seq(Literal(2), new Alias(UnresolvedAttribute("testVarA"), "first")(), Literal(3)), - scala.util.Left("SELECT :first"), - Seq.empty) - - assertAnalysisErrorCondition( - inputPlan = execImmediateSetVariablePlan, - expectedErrorCondition = "ALL_PARAMETERS_MUST_BE_NAMED", - expectedMessageParameters = Map( - "exprs" -> "\"2\", \"3\"" - )) - } - - test("EXEC IMMEDIATE - INTO specified for COMMAND query") { - val execImmediateSetVariablePlan = ExecuteImmediateQuery( - Seq.empty, - scala.util.Left("SET VAR testVarA = 1"), - Seq(UnresolvedAttribute("testVarA"))) - assertAnalysisErrorCondition( - inputPlan = execImmediateSetVariablePlan, - expectedErrorCondition = "INVALID_STATEMENT_FOR_EXECUTE_INTO", - expectedMessageParameters = Map( - "sqlString" -> "SET VAR TESTVARA = 1" - )) - } test("SPARK-6452 regression test") { // CheckAnalysis should throw AnalysisException when Aggregate contains missing attribute(s) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 0c8d2bae418a..5992f2d099bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.SparkException import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{AliasIdentifier, QueryPlanningTracker, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog, VariableDefinition} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable} +import org.apache.spark.sql.connector.catalog.InMemoryTable import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf @@ -1520,34 +1520,6 @@ class AnalysisSuite extends AnalysisTest with Matchers { assertAnalysisSuccess(finalPlan) } - test("Execute Immediate plan transformation") { - try { - val varDef1 = VariableDefinition(Identifier.of(Array("res"), "res"), "1", Literal(1)) - SimpleAnalyzer.catalogManager.tempVariableManager.create( - Seq("res", "res"), varDef1, overrideIfExists = true) - - val varDef2 = VariableDefinition(Identifier.of(Array("res2"), "res2"), "1", Literal(1)) - SimpleAnalyzer.catalogManager.tempVariableManager.create( - Seq("res2", "res2"), varDef2, overrideIfExists = true) - val actual1 = parsePlan("EXECUTE IMMEDIATE 'SELECT 42 WHERE ? = 1' USING 2").analyze - val expected1 = parsePlan("SELECT 42 where 2 = 1").analyze - comparePlans(actual1, expected1) - val actual2 = parsePlan( - "EXECUTE IMMEDIATE 'SELECT 42 WHERE :first = 1' USING 2 as first").analyze - val expected2 = parsePlan("SELECT 42 where 2 = 1").analyze - comparePlans(actual2, expected2) - // Test that plan is transformed to SET operation - val actual3 = parsePlan( - "EXECUTE IMMEDIATE 'SELECT 17, 7 WHERE ? = 1' INTO res, res2 USING 2").analyze - // Normalize to make the plan equivalent to the below set statement. - val expected3 = parsePlan("SET var (res, res2) = (SELECT 17, 7 where 2 = 1)").analyze - comparePlans(actual3, expected3) - } finally { - SimpleAnalyzer.catalogManager.tempVariableManager.remove(Seq("res")) - SimpleAnalyzer.catalogManager.tempVariableManager.remove(Seq("res2")) - } - } - test("SPARK-41271: bind named parameters to literals") { CTERelationDef.curId.set(0) val actual1 = NameParameterizedQuery( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala new file mode 100644 index 000000000000..43d08c47b14a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveExecuteImmediate.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.SqlScriptingContextManager +import org.apache.spark.sql.catalyst.expressions.{Alias, EmptyRow, Expression, Literal, + VariableReference} +import org.apache.spark.sql.catalyst.plans.logical.{Command, CompoundBody, LogicalPlan, SetVariable} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.EXECUTE_IMMEDIATE +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.StringType + +/** + * Analysis rule that resolves and executes EXECUTE IMMEDIATE statements during analysis, + * replacing them with the results, similar to how CALL statements work. + * This rule combines resolution and execution in a single pass. + */ +case class ResolveExecuteImmediate(sparkSession: SparkSession, catalogManager: CatalogManager) + extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) { + case node @ UnresolvedExecuteImmediate(sqlStmtStr, args, targetVariables) => + if (sqlStmtStr.resolved && targetVariables.forall(_.resolved) && args.forall(_.resolved)) { + // All resolved - execute immediately and handle INTO clause if present + if (targetVariables.nonEmpty) { + // EXECUTE IMMEDIATE ... INTO should generate SetVariable plan with eagerly executed + // source + val finalTargetVars = extractTargetVariables(targetVariables) + val executedSource = executeImmediateQuery(sqlStmtStr, args, hasIntoClause = true) + SetVariable(finalTargetVars, executedSource) + } else { + // Regular EXECUTE IMMEDIATE without INTO - execute and return result directly + executeImmediateQuery(sqlStmtStr, args, hasIntoClause = false) + } + } else { + // Not all resolved yet - wait for next iteration + node + } + } + } + + private def extractTargetVariables(targetVariables: Seq[Expression]): Seq[VariableReference] = { + targetVariables.map { + case alias: Alias => + // Extract the VariableReference from the alias + alias.child match { + case varRef: VariableReference => + // Use resolved VariableReference directly with canFold = false + varRef.copy(canFold = false) + case _ => + throw QueryCompilationErrors.unsupportedParameterExpression(alias.child) + } + case varRef: VariableReference => + // Use resolved VariableReference directly with canFold = false + varRef.copy(canFold = false) + case other => + throw QueryCompilationErrors.unsupportedParameterExpression(other) + } + } + + private def executeImmediateQuery( + sqlStmtStr: Expression, + args: Seq[Expression], + hasIntoClause: Boolean): LogicalPlan = { + // Extract the query string from the queryParam expression + val sqlString = extractQueryString(sqlStmtStr) + + // Parse and validate the query + val parsedPlan = sparkSession.sessionState.sqlParser.parsePlan(sqlString) + validateQuery(sqlString, parsedPlan) + + // Execute the query recursively with isolated local variable context + val result = if (args.isEmpty) { + // No parameters - execute directly + withIsolatedLocalVariableContext { + sparkSession.sql(sqlString) + } + } else { + // For parameterized queries, build parameter arrays + val (paramValues, paramNames) = buildUnifiedParameters(args) + + withIsolatedLocalVariableContext { + sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession] + .sql(sqlString, paramValues, paramNames) + } + } + + // If this EXECUTE IMMEDIATE has an INTO clause, commands are not allowed + if (hasIntoClause && result.queryExecution.analyzed.isInstanceOf[Command]) { + throw QueryCompilationErrors.invalidStatementForExecuteInto(sqlString) + } + + // For commands, use commandExecuted to avoid double execution + // For queries, use analyzed to avoid eager evaluation + if (result.queryExecution.analyzed.isInstanceOf[Command]) { + result.queryExecution.commandExecuted + } else { + result.queryExecution.analyzed + } + } + + private def extractQueryString(queryExpr: Expression): String = { + // Ensure the expression resolves to string type + if (!queryExpr.dataType.sameType(StringType)) { + throw QueryCompilationErrors.invalidExecuteImmediateExpressionType(queryExpr.dataType) + } + + // Evaluate the expression to get the query string + val value = queryExpr.eval(null) + if (value == null) { + // Extract the original text from the expression's origin for the error message + val originalText = extractOriginalText(queryExpr) + throw QueryCompilationErrors.nullSQLStringExecuteImmediate(originalText) + } + + value.toString + } + + private def extractOriginalText(queryExpr: Expression): String = { + val origin = queryExpr.origin + // Try to extract the original text from the origin information + (origin.sqlText, origin.startIndex, origin.stopIndex) match { + case (Some(sqlText), Some(startIndex), Some(stopIndex)) => + // Extract the substring from the original SQL text + sqlText.substring(startIndex, stopIndex + 1) + case _ => + // Fallback to the SQL representation if origin information is not available + queryExpr.sql + } + } + + private def validateQuery(queryString: String, parsedPlan: LogicalPlan): Unit = { + // Check for compound bodies (SQL scripting) + if (parsedPlan.isInstanceOf[CompoundBody]) { + throw QueryCompilationErrors.sqlScriptInExecuteImmediate(queryString) + } + } + + /** + * Builds parameter arrays for the sql() API. + */ + private def buildUnifiedParameters(args: Seq[Expression]): (Array[Any], Array[String]) = { + val values = scala.collection.mutable.ListBuffer[Any]() + val names = scala.collection.mutable.ListBuffer[String]() + + args.foreach { + case alias: Alias => + val paramValue = evaluateParameterExpression(alias.child) + values += paramValue + names += alias.name + case expr => + // Positional parameter: just a value + val paramValue = evaluateParameterExpression(expr) + values += paramValue + names += "" // unnamed expression + } + + (values.toArray, names.toArray) + } + + /** + * Evaluates a parameter expression. Validation for unsupported constructs like subqueries + * is already done during analysis in ResolveExecuteImmediate.validateExpressions(). + */ + private def evaluateParameterExpression(expr: Expression): Any = { + expr match { + case varRef: VariableReference => + // Variable references should be evaluated to their values + varRef.eval(EmptyRow) + case foldable if foldable.foldable => + Literal.create(foldable.eval(EmptyRow), foldable.dataType).value + case other => + // Expression is not foldable - not supported for parameters + throw QueryCompilationErrors.unsupportedParameterExpression(other) + } + } + + /** + * Temporarily isolates the SQL scripting context during EXECUTE IMMEDIATE execution. + * This makes withinSqlScript() return false, ensuring that statements within EXECUTE IMMEDIATE + * are not affected by the outer SQL script context (e.g., local variables, script-specific + * errors). + */ + private def withIsolatedLocalVariableContext[A](f: => A): A = { + // Completely clear the SQL scripting context to make withinSqlScript() return false + val handle = SqlScriptingContextManager.create(null) + handle.runWith(f) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala index 681e1b16af59..c7a008ab1128 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql import org.apache.spark.sql.{Artifact, DataSourceRegistration, Encoder, Encoders, ExperimentalMethods, Row, SparkSessionBuilder, SparkSessionCompanion, SparkSessionExtensions, SparkSessionExtensionsProvider, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.analysis.{NameParameterizedQuery, PosParameterizedQuery, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{GeneralParameterizedQuery, NameParameterizedQuery, PosParameterizedQuery, UnresolvedRelation} import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.parser.ParserInterface @@ -450,8 +450,8 @@ class SparkSession private( val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { val parsedPlan = sessionState.sqlParser.parsePlan(sqlText) if (args.nonEmpty) { + // Check for SQL scripting with positional parameters before creating parameterized query if (parsedPlan.isInstanceOf[CompoundBody]) { - // Positional parameters are not supported for SQL scripting. throw SqlScriptingErrors.positionalParametersAreNotSupportedWithSqlScripting() } PosParameterizedQuery(parsedPlan, args.map(lit(_).expr).toImmutableArraySeq) @@ -509,6 +509,45 @@ class SparkSession private( sql(sqlText, args.asScala.toMap) } + /** + * Executes a SQL query substituting parameters by the given arguments with optional names, + * returning the result as a `DataFrame`. This method allows the inner query to determine + * whether to use positional or named parameters based on its parameter markers. + */ + private[sql] def sql(sqlText: String, args: Array[_], paramNames: Array[String]): DataFrame = { + sql(sqlText, args, paramNames, new QueryPlanningTracker) + } + + /** + * Internal implementation of unified parameter API with tracker. + */ + private[sql] def sql( + sqlText: String, + args: Array[_], + paramNames: Array[String], + tracker: QueryPlanningTracker): DataFrame = + withActive { + val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { + val parsedPlan = sessionState.sqlParser.parsePlan(sqlText) + if (args.nonEmpty) { + // Check for SQL scripting with positional parameters before creating parameterized query + if (parsedPlan.isInstanceOf[CompoundBody] && paramNames.isEmpty) { + throw SqlScriptingErrors.positionalParametersAreNotSupportedWithSqlScripting() + } + // Create a general parameter query that can handle both positional and named parameters + // The query itself will determine which type to use based on its parameter markers + GeneralParameterizedQuery( + parsedPlan, + args.map(lit(_).expr).toImmutableArraySeq, + paramNames.toImmutableArraySeq + ) + } else { + parsedPlan + } + } + Dataset.ofRows(self, plan, tracker) + } + /** @inheritdoc */ override def sql(sqlText: String): DataFrame = sql(sqlText, Map.empty[String, Any]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala index a3591ff89e5c..e31e7e8d704c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.{CONFIG, CONFIG2, KEY, VALUE} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, VariableResolution} +import org.apache.spark.sql.catalyst.analysis.VariableResolution import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData @@ -112,8 +112,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) sparkSession.sessionState.analyzer.catalogManager.tempVariableManager ) val variable = variableResolution.lookupVariable( - nameParts = varName, - resolvingExecuteImmediate = AnalysisContext.get.isExecuteImmediate + nameParts = varName ) if (variable.isDefined) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index ada30cde27cd..5abb1e75543a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{DataSourceRegistration, ExperimentalMethods, SparkSessionExtensions, UDTFRegistration} import org.apache.spark.sql.artifact.ArtifactManager -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -244,6 +244,7 @@ abstract class BaseSessionStateBuilder( new EvalSubqueriesForTimeTravel +: new ResolveTranspose(session) +: new InvokeProcedures(session) +: + ResolveExecuteImmediate(session, this.catalogManager) +: ExtractSemiStructuredFields +: customResolutionRules diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out index d575cac56d28..b576f0454d45 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/execute-immediate.sql.out @@ -48,20 +48,22 @@ SetVariable [variablereference(system.session.sql_string=CAST(NULL AS STRING))] -- !query EXECUTE IMMEDIATE 'SET spark.sql.ansi.enabled=true' -- !query analysis -SetCommand (spark.sql.ansi.enabled,Some(true)) +CommandResult [key#x, value#x], Execute SetCommand, [[spark.sql.ansi.enabled,true]] + +- SetCommand (spark.sql.ansi.enabled,Some(true)) -- !query EXECUTE IMMEDIATE 'CREATE TEMPORARY VIEW IDENTIFIER(:tblName) AS SELECT id, name FROM tbl_view' USING 'tbl_view_tmp' as tblName -- !query analysis -CreateViewCommand `tbl_view_tmp`, SELECT id, name FROM tbl_view, false, false, LocalTempView, UNSUPPORTED, true - +- Project [id#x, name#x] - +- SubqueryAlias tbl_view - +- View (`tbl_view`, [id#x, name#x, data#x]) - +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] - +- Project [id#x, name#x, data#x] - +- SubqueryAlias tbl_view - +- LocalRelation [id#x, name#x, data#x] +CommandResult Execute CreateViewCommand + +- CreateViewCommand `tbl_view_tmp`, SELECT id, name FROM tbl_view, false, false, LocalTempView, UNSUPPORTED, true + +- Project [id#x, name#x] + +- SubqueryAlias tbl_view + +- View (`tbl_view`, [id#x, name#x, data#x]) + +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] + +- Project [id#x, name#x, data#x] + +- SubqueryAlias tbl_view + +- LocalRelation [id#x, name#x, data#x] -- !query @@ -83,7 +85,8 @@ Project [id#x, name#x] -- !query EXECUTE IMMEDIATE 'REFRESH TABLE IDENTIFIER(:tblName)' USING 'x' as tblName -- !query analysis -RefreshTableCommand `spark_catalog`.`default`.`x` +CommandResult Execute RefreshTableCommand + +- RefreshTableCommand `spark_catalog`.`default`.`x` -- !query @@ -152,7 +155,7 @@ Project [id#x, name#x, data#x] EXECUTE IMMEDIATE sql_string USING a, 'name2' -- !query analysis Project [id#x, name#x, data#x] -+- Filter ((name#x = variablereference(system.session.a='name1')) OR (name#x = name2)) ++- Filter ((name#x = name1) OR (name#x = name2)) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -178,7 +181,7 @@ Project [id#x, name#x, data#x] EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = ? or name = ?' USING a, 'name2' -- !query analysis Project [id#x, name#x, data#x] -+- Filter ((name#x = variablereference(system.session.a='name1')) OR (name#x = name2)) ++- Filter ((name#x = name1) OR (name#x = name2)) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -191,7 +194,7 @@ Project [id#x, name#x, data#x] EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = ? or name = ?' USING (a, 'name2') -- !query analysis Project [id#x, name#x, data#x] -+- Filter ((name#x = variablereference(system.session.a='name1')) OR (name#x = name2)) ++- Filter ((name#x = name1) OR (name#x = name2)) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -203,9 +206,10 @@ Project [id#x, name#x, data#x] -- !query EXECUTE IMMEDIATE 'INSERT INTO x VALUES(?)' USING 1 -- !query analysis -InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/x, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/x], Append, `spark_catalog`.`default`.`x`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/x), [id] -+- Project [col1#x AS id#x] - +- LocalRelation [col1#x] +CommandResult Execute InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/x, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/x], Append, `spark_catalog`.`default`.`x`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/x), [id] + +- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/x, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/x], Append, `spark_catalog`.`default`.`x`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/x), [id] + +- Project [col1#x AS id#x] + +- LocalRelation [col1#x] -- !query @@ -256,7 +260,7 @@ Project [id#x, name#x, data#x] EXECUTE IMMEDIATE sql_string USING b as second, 'name7' as first -- !query analysis Project [id#x, name#x, data#x] -+- Filter ((name#x = name7) OR (id#x = variablereference(system.session.b=40))) ++- Filter ((name#x = name7) OR (id#x = 40)) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -282,7 +286,7 @@ Project [id#x, name#x, data#x] EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first or id = :second' USING 'name7' as first, b as second -- !query analysis Project [id#x, name#x, data#x] -+- Filter ((name#x = name7) OR (id#x = variablereference(system.session.b=40))) ++- Filter ((name#x = name7) OR (id#x = 40)) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -307,9 +311,10 @@ Project [id#x, name#x, data#x, name7 AS p#x] -- !query EXECUTE IMMEDIATE 'SET VAR sql_string = ?' USING 'SELECT id from tbl_view where name = :first' -- !query analysis -SetVariable [variablereference(system.session.sql_string='SELECT * from tbl_view where name = :first or id = :second')] -+- Project [SELECT id from tbl_view where name = :first AS sql_string#x] - +- OneRowRelation +CommandResult SetVariable [variablereference(system.session.sql_string='SELECT * from tbl_view where name = :first or id = :second')] + +- SetVariable [variablereference(system.session.sql_string='SELECT * from tbl_view where name = :first or id = :second')] + +- Project [SELECT id from tbl_view where name = :first AS sql_string#x] + +- OneRowRelation -- !query @@ -356,7 +361,7 @@ SetVariable [variablereference(system.session.res_id=70)] +- GlobalLimit 2 +- LocalLimit 2 +- Project [id#x] - +- Filter (name#x = variablereference(system.session.a='name1')) + +- Filter (name#x = name1) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -422,7 +427,7 @@ Project [variablereference(system.session.b=10) AS b#x, variablereference(system EXECUTE IMMEDIATE 'SELECT * FROM tbl_view where id = ? AND name = ?' USING b as first, a -- !query analysis Project [id#x, name#x, data#x] -+- Filter ((id#x = variablereference(system.session.b=10)) AND (name#x = variablereference(system.session.a='name1'))) ++- Filter ((id#x = 10) AND (name#x = name1)) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -500,15 +505,34 @@ org.apache.spark.sql.AnalysisException -- !query -EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id +DECLARE OR REPLACE testvarA INT +-- !query analysis +CreateVariable defaultvalueexpression(null, null), true ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.testvarA + + +-- !query +EXECUTE IMMEDIATE 'SET VAR testVarA = 1' INTO testVarA -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "UNRESOLVED_VARIABLE", - "sqlState" : "42883", + "errorClass" : "INVALID_STATEMENT_FOR_EXECUTE_INTO", + "sqlState" : "07501", "messageParameters" : { - "searchPath" : "`system`.`session`", - "variableName" : "`id`" + "sqlString" : "SET VAR TESTVARA = 1" + } +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`id`" }, "queryContext" : [ { "objectType" : "", @@ -590,10 +614,10 @@ EXECUTE IMMEDIATE b -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", "sqlState" : "42K09", "messageParameters" : { - "varType" : "\"INT\"" + "exprType" : "\"INT\"" } } @@ -617,21 +641,14 @@ SetVariable [variablereference(system.session.a='name1')] -- !query EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING CONCAT(a , "me1") as first -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER", - "sqlState" : "42K0E", - "messageParameters" : { - "invalidExprSql" : "\"CONCAT(a, me1)\"" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 70, - "stopIndex" : 86, - "fragment" : "CONCAT(a , \"me1\")" - } ] -} +Project [id#x, name#x, data#x] ++- Filter (name#x = name1) + +- SubqueryAlias tbl_view + +- View (`tbl_view`, [id#x, name#x, data#x]) + +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] + +- Project [id#x, name#x, data#x] + +- SubqueryAlias tbl_view + +- LocalRelation [id#x, name#x, data#x] -- !query @@ -760,7 +777,7 @@ CreateVariable defaultvalueexpression(10, 10), false EXECUTE IMMEDIATE 'SELECT id FROM tbl_view WHERE id = :p' USING p -- !query analysis Project [id#x] -+- Filter (id#x = variablereference(system.session.p=10)) ++- Filter (id#x = 10) +- SubqueryAlias tbl_view +- View (`tbl_view`, [id#x, name#x, data#x]) +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] @@ -796,36 +813,200 @@ org.apache.spark.sql.AnalysisException -- !query -EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ? USING 10\'' +EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ?\' USING 10' +-- !query analysis +Project [id#x] ++- Filter (id#x = 10) + +- SubqueryAlias tbl_view + +- View (`tbl_view`, [id#x, name#x, data#x]) + +- Project [cast(id#x as int) AS id#x, cast(name#x as string) AS name#x, cast(data#x as struct<f1:int,s2:struct<f2:int,f3:string>>) AS data#x] + +- Project [id#x, name#x, data#x] + +- SubqueryAlias tbl_view + +- LocalRelation [id#x, name#x, data#x] + + +-- !query +SET VAR sql_string = null +-- !query analysis +SetVariable [variablereference(system.session.sql_string='SELECT * from tbl_view where name = :first or id = :second')] ++- Project [cast(sql_string#x as string) AS sql_string#x] + +- Project [null AS sql_string#x] + +- OneRowRelation + + +-- !query +EXECUTE IMMEDIATE sql_string -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "NESTED_EXECUTE_IMMEDIATE", - "sqlState" : "07501", + "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE", + "sqlState" : "22004", "messageParameters" : { - "sqlString" : "EXECUTE IMMEDIATE 'SELECT ID FROM TBL_VIEW WHERE ID = ? USING 10'" + "varName" : "`sql_string`" } } -- !query -SET VAR sql_string = null +SET VAR sql_string = 5 -- !query analysis -SetVariable [variablereference(system.session.sql_string='SELECT * from tbl_view where name = :first or id = :second')] +SetVariable [variablereference(system.session.sql_string=CAST(NULL AS STRING))] +- Project [cast(sql_string#x as string) AS sql_string#x] - +- Project [null AS sql_string#x] + +- Project [5 AS sql_string#x] +- OneRowRelation -- !query EXECUTE IMMEDIATE sql_string -- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'5'", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 28, + "fragment" : "EXECUTE IMMEDIATE sql_string" + } ] +} + + +-- !query +SET VAR sql_string = 'hello' +-- !query analysis +SetVariable [variablereference(system.session.sql_string='5')] ++- Project [hello AS sql_string#x] + +- OneRowRelation + + +-- !query +EXECUTE IMMEDIATE length(sql_string) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "sqlState" : "42K09", + "messageParameters" : { + "exprType" : "\"INT\"" + } +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT 42 where ? = :first' USING 1, 2 as first +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_QUERY_MIXED_QUERY_PARAMETERS", + "sqlState" : "42613" +} + + +-- !query +DECLARE int_var INT +-- !query analysis +CreateVariable defaultvalueexpression(null, null), false ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.int_var + + +-- !query +SET VAR int_var = 42 +-- !query analysis +SetVariable [variablereference(system.session.int_var=CAST(NULL AS INT))] ++- Project [42 AS int_var#x] + +- OneRowRelation + + +-- !query +EXECUTE IMMEDIATE int_var +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "sqlState" : "42K09", + "messageParameters" : { + "exprType" : "\"INT\"" + } +} + + +-- !query +DECLARE null_var STRING +-- !query analysis +CreateVariable defaultvalueexpression(null, null), false ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.null_var + + +-- !query +SET VAR null_var = null +-- !query analysis +SetVariable [variablereference(system.session.null_var=CAST(NULL AS STRING))] ++- Project [cast(null_var#x as string) AS null_var#x] + +- Project [null AS null_var#x] + +- OneRowRelation + + +-- !query +EXECUTE IMMEDIATE null_var +-- !query analysis org.apache.spark.sql.AnalysisException { "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE", "sqlState" : "22004", "messageParameters" : { - "varName" : "`sql_string`" + "varName" : "`null_var`" + } +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT ?' USING (SELECT 1) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER", + "sqlState" : "42K0E", + "messageParameters" : { + "invalidExprSql" : "\"scalarsubquery()\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 36, + "stopIndex" : 45, + "fragment" : "(SELECT 1)" + } ] +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT :first' USING 2, 3 +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "ALL_PARAMETERS_MUST_BE_NAMED", + "sqlState" : "07001", + "messageParameters" : { + "exprs" : "\"2\", \"3\"" + } +} + + +-- !query +EXECUTE IMMEDIATE (SELECT c FROM (VALUES(1)) AS T(c)) +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "sqlState" : "42K09", + "messageParameters" : { + "exprType" : "\"INT\"" } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out index 1c9db4eaed6f..e73a4c547103 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/parse-query-correctness-old-behavior.sql.out @@ -994,24 +994,36 @@ CreateVariable defaultvalueexpression(null, null), false -- !query EXECUTE IMMEDIATE 'SELECT 1 UNION SELECT 2 UNION SELECT 3' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))] -+- Project [1 AS UNION#x] - +- OneRowRelation +org.apache.spark.SparkException +{ + "errorClass" : "ROW_SUBQUERY_TOO_MANY_ROWS", + "sqlState" : "21000" +} -- !query SELECT v1 -- !query analysis -Project [variablereference(system.session.v1=1) AS v1#x] +Project [variablereference(system.session.v1=CAST(NULL AS INT)) AS v1#x] +- OneRowRelation -- !query EXECUTE IMMEDIATE 'SELECT 1 UNION SELECT 1 UNION SELECT 1' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS UNION#x] - +- OneRowRelation +SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))] ++- GlobalLimit 2 + +- LocalLimit 2 + +- Distinct + +- Union false, false + :- Distinct + : +- Union false, false + : :- Project [1 AS 1#x] + : : +- OneRowRelation + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [1 AS 1#x] + +- OneRowRelation -- !query @@ -1025,8 +1037,14 @@ Project [variablereference(system.session.v1=1) AS v1#x] EXECUTE IMMEDIATE 'SELECT 1 EXCEPT SELECT 2 EXCEPT SELECT 3' INTO v1 -- !query analysis SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS EXCEPT#x] - +- OneRowRelation ++- Except false + :- Except false + : :- Project [1 AS 1#x] + : : +- OneRowRelation + : +- Project [2 AS 2#x] + : +- OneRowRelation + +- Project [3 AS 3#x] + +- OneRowRelation -- !query @@ -1040,38 +1058,56 @@ Project [variablereference(system.session.v1=1) AS v1#x] EXECUTE IMMEDIATE 'SELECT 1 EXCEPT SELECT 1 EXCEPT SELECT 1' INTO v1 -- !query analysis SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS EXCEPT#x] - +- OneRowRelation ++- Except false + :- Except false + : :- Project [1 AS 1#x] + : : +- OneRowRelation + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [1 AS 1#x] + +- OneRowRelation -- !query SELECT v1 -- !query analysis -Project [variablereference(system.session.v1=1) AS v1#x] +Project [variablereference(system.session.v1=CAST(NULL AS INT)) AS v1#x] +- OneRowRelation -- !query EXECUTE IMMEDIATE 'SELECT 1 INTERSECT SELECT 2 INTERSECT SELECT 3' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS INTERSECT#x] - +- OneRowRelation +SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))] ++- Intersect false + :- Intersect false + : :- Project [1 AS 1#x] + : : +- OneRowRelation + : +- Project [2 AS 2#x] + : +- OneRowRelation + +- Project [3 AS 3#x] + +- OneRowRelation -- !query SELECT v1 -- !query analysis -Project [variablereference(system.session.v1=1) AS v1#x] +Project [variablereference(system.session.v1=CAST(NULL AS INT)) AS v1#x] +- OneRowRelation -- !query EXECUTE IMMEDIATE 'SELECT 1 INTERSECT SELECT 1 INTERSECT SELECT 1' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS INTERSECT#x] - +- OneRowRelation +SetVariable [variablereference(system.session.v1=CAST(NULL AS INT))] ++- Intersect false + :- Intersect false + : :- Project [1 AS 1#x] + : : +- OneRowRelation + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [1 AS 1#x] + +- OneRowRelation -- !query @@ -1084,9 +1120,22 @@ Project [variablereference(system.session.v1=1) AS v1#x] -- !query EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS JOIN#x] - +- OneRowRelation +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'SELECT'", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 50, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1" + } ] +} -- !query @@ -1099,9 +1148,22 @@ Project [variablereference(system.session.v1=1) AS v1#x] -- !query EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS VALUES#x] - +- OneRowRelation +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'('", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 47, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1" + } ] +} -- !query @@ -1114,9 +1176,22 @@ Project [variablereference(system.session.v1=1) AS v1#x] -- !query EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS alias#x] - +- OneRowRelation +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'garbage'", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 66, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' INTO v1" + } ] +} -- !query @@ -1129,9 +1204,22 @@ Project [variablereference(system.session.v1=1) AS v1#x] -- !query EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1 -- !query analysis -SetVariable [variablereference(system.session.v1=1)] -+- Project [1 AS WITH#x] - +- OneRowRelation +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'abc'", + "hint" : ": extra input 'abc'" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 45, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1" + } ] +} -- !query diff --git a/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql b/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql index f7d27c6c0b03..3eaacbd37abc 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/execute-immediate.sql @@ -87,6 +87,10 @@ EXECUTE IMMEDIATE 'SELECT \'invalid_cast_error_expected\'' INTO res_id; -- require query when using INTO EXECUTE IMMEDIATE 'INSERT INTO x VALUES (?)' INTO res_id USING 1; +-- require query when using INTO with SET VAR command +DECLARE OR REPLACE testvarA INT; +EXECUTE IMMEDIATE 'SET VAR testVarA = 1' INTO testVarA; + -- use column in using - should fail as we expect variable here EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id; @@ -109,8 +113,10 @@ EXECUTE IMMEDIATE b; SET VAR sql_string = 'SELECT * from tbl_view where name = :first or id = :second'; SET VAR a = 'na'; --- expressions not supported - feature not supported +-- constant expressions are supported EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING CONCAT(a , "me1") as first; + +-- subquery in using not supported EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING (SELECT 42) as first, 'name2' as second; -- INTO variables not matching scalar types @@ -140,10 +146,41 @@ EXECUTE IMMEDIATE 'SELECT id FROM tbl_view WHERE id = :p' USING p, 'p'; EXECUTE IMMEDIATE 'SELECT id, data.f1 FROM tbl_view WHERE id = 10' INTO res_id, res_id; -- nested execute immediate -EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ? USING 10\''; +EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ?\' USING 10'; -- sqlString is null SET VAR sql_string = null; EXECUTE IMMEDIATE sql_string; -DROP TABLE x; \ No newline at end of file +-- sqlString is not a string +SET VAR sql_string = 5; +EXECUTE IMMEDIATE sql_string; + +-- sqlString is not a well formed SQL statement. +SET VAR sql_string = 'hello'; +EXECUTE IMMEDIATE length(sql_string); + +-- mixed positional and named parameters in query +EXECUTE IMMEDIATE 'SELECT 42 where ? = :first' USING 1, 2 as first; + +-- non-string variable as sqlString parameter +DECLARE int_var INT; +SET VAR int_var = 42; +EXECUTE IMMEDIATE int_var; + +-- null string as sqlString parameter +DECLARE null_var STRING; +SET VAR null_var = null; +EXECUTE IMMEDIATE null_var; + +-- unsupported expression for parameter (subquery) +EXECUTE IMMEDIATE 'SELECT ?' USING (SELECT 1); + +-- named query with unnamed parameters +EXECUTE IMMEDIATE 'SELECT :first' USING 2, 3; + +-- Query is not a constant +EXECUTE IMMEDIATE (SELECT c FROM (VALUES(1)) AS T(c)); + +DROP TABLE x; + diff --git a/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out b/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out index 9249d7eb3e51..8b44bef68c16 100644 --- a/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/execute-immediate.sql.out @@ -423,17 +423,39 @@ org.apache.spark.sql.AnalysisException -- !query -EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id +DECLARE OR REPLACE testvarA INT +-- !query schema +struct<> +-- !query output + + + +-- !query +EXECUTE IMMEDIATE 'SET VAR testVarA = 1' INTO testVarA -- !query schema struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "UNRESOLVED_VARIABLE", - "sqlState" : "42883", + "errorClass" : "INVALID_STATEMENT_FOR_EXECUTE_INTO", + "sqlState" : "07501", + "messageParameters" : { + "sqlString" : "SET VAR TESTVARA = 1" + } +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT * FROM tbl_view WHERE ? = id' USING id +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + "sqlState" : "42703", "messageParameters" : { - "searchPath" : "`system`.`session`", - "variableName" : "`id`" + "objectName" : "`id`" }, "queryContext" : [ { "objectType" : "", @@ -525,10 +547,10 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INVALID_VARIABLE_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", "sqlState" : "42K09", "messageParameters" : { - "varType" : "\"INT\"" + "exprType" : "\"INT\"" } } @@ -552,23 +574,9 @@ struct<> -- !query EXECUTE IMMEDIATE 'SELECT * from tbl_view where name = :first' USING CONCAT(a , "me1") as first -- !query schema -struct<> +struct<id:int,name:string,data:struct<f1:int,s2:struct<f2:int,f3:string>>> -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER", - "sqlState" : "42K0E", - "messageParameters" : { - "invalidExprSql" : "\"CONCAT(a, me1)\"" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 70, - "stopIndex" : 86, - "fragment" : "CONCAT(a , \"me1\")" - } ] -} +10 name1 {"f1":1,"s2":{"f2":101,"f3":"a"}} -- !query @@ -747,22 +755,38 @@ org.apache.spark.sql.AnalysisException -- !query -EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ? USING 10\'' +EXECUTE IMMEDIATE 'EXECUTE IMMEDIATE \'SELECT id FROM tbl_view WHERE id = ?\' USING 10' +-- !query schema +struct<id:int> +-- !query output +10 + + +-- !query +SET VAR sql_string = null +-- !query schema +struct<> +-- !query output + + + +-- !query +EXECUTE IMMEDIATE sql_string -- !query schema struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "NESTED_EXECUTE_IMMEDIATE", - "sqlState" : "07501", + "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE", + "sqlState" : "22004", "messageParameters" : { - "sqlString" : "EXECUTE IMMEDIATE 'SELECT ID FROM TBL_VIEW WHERE ID = ? USING 10'" + "varName" : "`sql_string`" } } -- !query -SET VAR sql_string = null +SET VAR sql_string = 5 -- !query schema struct<> -- !query output @@ -774,12 +798,169 @@ EXECUTE IMMEDIATE sql_string -- !query schema struct<> -- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'5'", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 28, + "fragment" : "EXECUTE IMMEDIATE sql_string" + } ] +} + + +-- !query +SET VAR sql_string = 'hello' +-- !query schema +struct<> +-- !query output + + + +-- !query +EXECUTE IMMEDIATE length(sql_string) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "sqlState" : "42K09", + "messageParameters" : { + "exprType" : "\"INT\"" + } +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT 42 where ? = :first' USING 1, 2 as first +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_QUERY_MIXED_QUERY_PARAMETERS", + "sqlState" : "42613" +} + + +-- !query +DECLARE int_var INT +-- !query schema +struct<> +-- !query output + + + +-- !query +SET VAR int_var = 42 +-- !query schema +struct<> +-- !query output + + + +-- !query +EXECUTE IMMEDIATE int_var +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "sqlState" : "42K09", + "messageParameters" : { + "exprType" : "\"INT\"" + } +} + + +-- !query +DECLARE null_var STRING +-- !query schema +struct<> +-- !query output + + + +-- !query +SET VAR null_var = null +-- !query schema +struct<> +-- !query output + + + +-- !query +EXECUTE IMMEDIATE null_var +-- !query schema +struct<> +-- !query output org.apache.spark.sql.AnalysisException { "errorClass" : "NULL_QUERY_STRING_EXECUTE_IMMEDIATE", "sqlState" : "22004", "messageParameters" : { - "varName" : "`sql_string`" + "varName" : "`null_var`" + } +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT ?' USING (SELECT 1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "UNSUPPORTED_EXPR_FOR_PARAMETER", + "sqlState" : "42K0E", + "messageParameters" : { + "invalidExprSql" : "\"scalarsubquery()\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 36, + "stopIndex" : 45, + "fragment" : "(SELECT 1)" + } ] +} + + +-- !query +EXECUTE IMMEDIATE 'SELECT :first' USING 2, 3 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "ALL_PARAMETERS_MUST_BE_NAMED", + "sqlState" : "07001", + "messageParameters" : { + "exprs" : "\"2\", \"3\"" + } +} + + +-- !query +EXECUTE IMMEDIATE (SELECT c FROM (VALUES(1)) AS T(c)) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "INVALID_EXPR_TYPE_FOR_QUERY_EXECUTE_IMMEDIATE", + "sqlState" : "42K09", + "messageParameters" : { + "exprType" : "\"INT\"" } } diff --git a/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out b/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out index 6c9751864324..9a523e9562f9 100644 --- a/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/parse-query-correctness-old-behavior.sql.out @@ -614,7 +614,11 @@ EXECUTE IMMEDIATE 'SELECT 1 UNION SELECT 2 UNION SELECT 3' INTO v1 -- !query schema struct<> -- !query output - +org.apache.spark.SparkException +{ + "errorClass" : "ROW_SUBQUERY_TOO_MANY_ROWS", + "sqlState" : "21000" +} -- !query @@ -622,7 +626,7 @@ SELECT v1 -- !query schema struct<v1:int> -- !query output -1 +NULL -- !query @@ -670,7 +674,7 @@ SELECT v1 -- !query schema struct<v1:int> -- !query output -1 +NULL -- !query @@ -686,7 +690,7 @@ SELECT v1 -- !query schema struct<v1:int> -- !query output -1 +NULL -- !query @@ -710,7 +714,22 @@ EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1 -- !query schema struct<> -- !query output - +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'SELECT'", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 50, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 JOIN SELECT 2' INTO v1" + } ] +} -- !query @@ -726,7 +745,22 @@ EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1 -- !query schema struct<> -- !query output - +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'('", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 47, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 VALUES (1)' INTO v1" + } ] +} -- !query @@ -742,7 +776,22 @@ EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' INTO v1 -- !query schema struct<> -- !query output - +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'garbage'", + "hint" : "" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 66, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 alias garbage garbage garbage' INTO v1" + } ] +} -- !query @@ -758,7 +807,22 @@ EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1 -- !query schema struct<> -- !query output - +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'abc'", + "hint" : ": extra input 'abc'" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 45, + "fragment" : "EXECUTE IMMEDIATE 'SELECT 1 WITH abc' INTO v1" + } ] +} -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala index 666f85e19c1c..629d85f19b0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.errors import org.apache.spark.SparkThrowable -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId @@ -62,16 +62,16 @@ class QueryParsingErrorsSuite extends QueryTest with SharedSparkSession with SQL ) } - test("PARSE_SYNTAX_ERROR: Execute immediate syntax error with INTO specified") { + test("UNRESOLVED_COLUMN: Execute immediate with unresolved column in INTO clause") { val query = "EXECUTE IMMEDIATE 'SELCT 1707 WHERE ? = 1' INTO a USING 1" checkError( - exception = parseException(query), - condition = "PARSE_SYNTAX_ERROR", - parameters = Map("error" -> "'SELCT'", "hint" -> ""), + exception = intercept[AnalysisException](sql(query)), + condition = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + parameters = Map("objectName" -> "`a`"), context = ExpectedContext( - start = 0, - stop = 56, - fragment = query) + start = 48, + stop = 48, + fragment = "a") ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index f9bebce7cbfa..aa801b6e2f68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveSessionCatalog, ResolveTranspose} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, InvalidUDFClassException} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} @@ -133,6 +133,7 @@ class HiveSessionStateBuilder( new DetermineTableStats(session) +: new ResolveTranspose(session) +: new InvokeProcedures(session) +: + ResolveExecuteImmediate(session, catalogManager) +: ExtractSemiStructuredFields +: customResolutionRules --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org