This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 7313d71438e4 [SPARK-48273][SQL] Fix late rewrite of
PlanWithUnresolvedIdentifier
7313d71438e4 is described below
commit 7313d71438e4691f7c086e90ded4a6f644cdcdc5
Author: Nikola Mandic <[email protected]>
AuthorDate: Tue May 28 09:59:53 2024 -0700
[SPARK-48273][SQL] Fix late rewrite of PlanWithUnresolvedIdentifier
### What changes were proposed in this pull request?
`PlanWithUnresolvedIdentifier` is rewritten later in analysis which causes
rules like
`SubstituteUnresolvedOrdinals` to miss the new plan. This causes following
queries to fail:
```
create temporary view identifier('v1') as (select my_col from (values (1),
(2), (1) as (my_col)) group by 1);
--
cache table identifier('t1') as (select my_col from (values (1), (2), (1)
as (my_col)) group by 1);
--
create table identifier('t2') as (select my_col from (values (1), (2), (1)
as (my_col)) group by 1);
insert into identifier('t2') select my_col from (values (3) as (my_col))
group by 1;
```
Fix this by explicitly applying rules after plan rewrite.
### Why are the changes needed?
To fix the described bug.
### Does this PR introduce _any_ user-facing change?
Yes, it fixes the mentioned problematic queries.
### How was this patch tested?
Updated existing `identifier-clause.sql` golden file.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46580 from nikolamand-db/SPARK-48273.
Authored-by: Nikola Mandic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 731a2cfcffaeeeb1f1c107080ca77000330d79b5)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 9 ++--
.../analysis/ResolveIdentifierClause.scala | 11 ++--
.../spark/sql/catalyst/rules/RuleExecutor.scala | 2 +-
.../analyzer-results/identifier-clause.sql.out | 59 ++++++++++++++++++++++
.../sql-tests/inputs/identifier-clause.sql | 9 ++++
.../sql-tests/results/identifier-clause.sql.out | 56 ++++++++++++++++++++
6 files changed, 139 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ed7b978045c7..5890a9692e20 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -255,7 +255,7 @@ class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor
TypeCoercion.typeCoercionRules
}
- override def batches: Seq[Batch] = Seq(
+ private def earlyBatches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
// This rule optimizes `UpdateFields` expression chains so looks more
like optimization rule.
// However, when manipulating deeply nested schema, `UpdateFields`
expression tree could be
@@ -275,7 +275,10 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Keep Legacy Outputs", Once,
- KeepLegacyOutputs),
+ KeepLegacyOutputs)
+ )
+
+ override def batches: Seq[Batch] = earlyBatches ++ Seq(
Batch("Resolution", fixedPoint,
new ResolveCatalogs(catalogManager) ::
ResolveInsertInto ::
@@ -319,7 +322,7 @@ class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor
ResolveTimeZone ::
ResolveRandomSeed ::
ResolveBinaryArithmetic ::
- ResolveIdentifierClause ::
+ new ResolveIdentifierClause(earlyBatches) ::
ResolveUnion ::
ResolveRowLevelCommandAssignments ::
RewriteDeleteFromTable ::
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
index e0d3e5629ef8..422bad3d89e2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
@@ -20,19 +20,24 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper,
Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER
import org.apache.spark.sql.types.StringType
/**
* Resolves the identifier expressions and builds the original
plans/expressions.
*/
-object ResolveIdentifierClause extends Rule[LogicalPlan] with AliasHelper with
EvalHelper {
+class ResolveIdentifierClause(earlyBatches:
Seq[RuleExecutor[LogicalPlan]#Batch])
+ extends Rule[LogicalPlan] with AliasHelper with EvalHelper {
+
+ private val executor = new RuleExecutor[LogicalPlan] {
+ override def batches: Seq[Batch] = earlyBatches.asInstanceOf[Seq[Batch]]
+ }
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) {
case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved =>
- p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr))
+
executor.execute(p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr)))
case other =>
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER))
{
case e: ExpressionWithUnresolvedIdentifier if
e.identifierExpr.resolved =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 9d29ca1f9c6e..c16b50a2b17a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -143,7 +143,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]]
extends Logging {
override val maxIterationsSetting: String = null) extends Strategy
/** A batch of rules. */
- protected case class Batch(name: String, strategy: Strategy, rules:
Rule[TreeType]*)
+ protected[catalyst] case class Batch(name: String, strategy: Strategy,
rules: Rule[TreeType]*)
/** Defines a sequence of rule batches, to be overridden by the
implementation. */
protected def batches: Seq[Batch]
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
index f91d0a26cf8a..823ce43247a7 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
@@ -881,6 +881,65 @@ org.apache.spark.sql.catalyst.parser.ParseException
}
+-- !query
+create temporary view identifier('v1') as (select my_col from (values (1),
(2), (1) as (my_col)) group by 1)
+-- !query analysis
+CreateViewCommand `v1`, (select my_col from (values (1), (2), (1) as (my_col))
group by 1), false, false, LocalTempView, UNSUPPORTED, true
+ +- Aggregate [my_col#x], [my_col#x]
+ +- SubqueryAlias __auto_generated_subquery_name
+ +- SubqueryAlias as
+ +- LocalRelation [my_col#x]
+
+
+-- !query
+cache table identifier('t1') as (select my_col from (values (1), (2), (1) as
(my_col)) group by 1)
+-- !query analysis
+CacheTableAsSelect t1, (select my_col from (values (1), (2), (1) as (my_col))
group by 1), false, true
+ +- Aggregate [my_col#x], [my_col#x]
+ +- SubqueryAlias __auto_generated_subquery_name
+ +- SubqueryAlias as
+ +- LocalRelation [my_col#x]
+
+
+-- !query
+create table identifier('t2') as (select my_col from (values (1), (2), (1) as
(my_col)) group by 1)
+-- !query analysis
+CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`t2`,
ErrorIfExists, [my_col]
+ +- Aggregate [my_col#x], [my_col#x]
+ +- SubqueryAlias __auto_generated_subquery_name
+ +- SubqueryAlias as
+ +- LocalRelation [my_col#x]
+
+
+-- !query
+insert into identifier('t2') select my_col from (values (3) as (my_col)) group
by 1
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in
comparison]/{warehouse_dir}/t2, false, Parquet, [path=file:[not included in
comparison]/{warehouse_dir}/t2], Append, `spark_catalog`.`default`.`t2`,
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included
in comparison]/{warehouse_dir}/t2), [my_col]
++- Aggregate [my_col#x], [my_col#x]
+ +- SubqueryAlias __auto_generated_subquery_name
+ +- SubqueryAlias as
+ +- LocalRelation [my_col#x]
+
+
+-- !query
+drop view v1
+-- !query analysis
+DropTempViewCommand v1
+
+
+-- !query
+drop table t1
+-- !query analysis
+DropTempViewCommand t1
+
+
+-- !query
+drop table t2
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t2
+
+
-- !query
SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW
win AS (ORDER BY c1)
-- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
index 07ae15707293..9e6314202b5f 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
@@ -122,6 +122,15 @@ CREATE TEMPORARY FUNCTION IDENTIFIER('default.my' ||
'DoubleAvg') AS 'test.org.a
DROP TEMPORARY FUNCTION IDENTIFIER('default.my' || 'DoubleAvg');
CREATE TEMPORARY VIEW IDENTIFIER('default.v')(c1) AS VALUES(1);
+-- SPARK-48273: Aggregation operation in statements using identifier clause
for table name
+create temporary view identifier('v1') as (select my_col from (values (1),
(2), (1) as (my_col)) group by 1);
+cache table identifier('t1') as (select my_col from (values (1), (2), (1) as
(my_col)) group by 1);
+create table identifier('t2') as (select my_col from (values (1), (2), (1) as
(my_col)) group by 1);
+insert into identifier('t2') select my_col from (values (3) as (my_col)) group
by 1;
+drop view v1;
+drop table t1;
+drop table t2;
+
-- Not supported
SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW
win AS (ORDER BY c1);
SELECT T1.c1 FROM VALUES(1) AS T1(c1) JOIN VALUES(1) AS T2(c1) USING
(IDENTIFIER('c1'));
diff --git
a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
index ed87f69fc5e6..4d62c371a171 100644
--- a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
@@ -1010,6 +1010,62 @@ org.apache.spark.sql.catalyst.parser.ParseException
}
+-- !query
+create temporary view identifier('v1') as (select my_col from (values (1),
(2), (1) as (my_col)) group by 1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+cache table identifier('t1') as (select my_col from (values (1), (2), (1) as
(my_col)) group by 1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create table identifier('t2') as (select my_col from (values (1), (2), (1) as
(my_col)) group by 1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+insert into identifier('t2') select my_col from (values (3) as (my_col)) group
by 1
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop view v1
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table t1
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+drop table t2
+-- !query schema
+struct<>
+-- !query output
+
+
+
-- !query
SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW
win AS (ORDER BY c1)
-- !query schema
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]