This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b5241c9 [SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows
its children to be removed once the command is marked as analyzed
b5241c9 is described below
commit b5241c97b17a1139a4ff719bfce7f68aef094d95
Author: Terry Kim <[email protected]>
AuthorDate: Wed Apr 14 08:24:25 2021 +0000
[SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its children
to be removed once the command is marked as analyzed
### What changes were proposed in this pull request?
This PR proposes to introduce the `AnalysisOnlyCommand` trait such that a
command that extends this trait can have its children only analyzed, but not
optimized. There is a corresponding analysis rule `HandleAnalysisOnlyCommand`
that marks the command as analyzed after all other analysis rules are run.
This can be useful if a logical plan has children where they need to be
only analyzed, but not optimized - e.g., `CREATE VIEW` or `CACHE TABLE AS`.
This also addresses the issue found in #31933.
This PR also updates `CreateViewCommand`, `CacheTableAsSelect`, and
`AlterViewAsCommand` to use the new trait / rule such that their children are
only analyzed.
### Why are the changes needed?
To address the issue where the plan is unnecessarily re-analyzed in
`CreateViewCommand`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests should cover the changes.
Closes #32032 from imback82/skip_transform.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 16 +++++++-
.../spark/sql/catalyst/plans/logical/Command.scala | 11 +++++
.../sql/catalyst/plans/logical/v2Commands.scala | 13 +++++-
.../main/scala/org/apache/spark/sql/Dataset.scala | 5 ++-
.../catalyst/analysis/ResolveSessionCatalog.scala | 22 +++++-----
.../apache/spark/sql/execution/command/views.scala | 47 ++++++++++++++++------
.../execution/datasources/v2/CacheTableExec.scala | 26 ++++++------
.../sql-tests/results/explain-aqe.sql.out | 2 +-
.../resources/sql-tests/results/explain.sql.out | 2 +-
9 files changed, 102 insertions(+), 42 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7a11396..e7c72db 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -309,7 +309,9 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
- CleanupAliases)
+ CleanupAliases),
+ Batch("HandleAnalysisOnlyCommand", Once,
+ HandleAnalysisOnlyCommand)
)
/**
@@ -3543,6 +3545,18 @@ class Analyzer(override val catalogManager:
CatalogManager)
}
}
}
+
+ /**
+ * A rule that marks a command as analyzed so that its children are removed
to avoid
+ * being optimized. This rule should run after all other analysis rules are
run.
+ */
+ object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators
{
+ case c: AnalysisOnlyCommand if c.resolved =>
+ checkAnalysis(c)
+ c.markAsAnalyzed()
+ }
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 94ead5e..81ad92b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -37,3 +37,14 @@ trait Command extends LogicalPlan {
trait LeafCommand extends Command with LeafLike[LogicalPlan]
trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
trait BinaryCommand extends Command with BinaryLike[LogicalPlan]
+
+/**
+ * A logical node that can be used for a command that requires its children to
be only analyzed,
+ * but not optimized.
+ */
+trait AnalysisOnlyCommand extends Command {
+ val isAnalyzed: Boolean
+ def childrenToAnalyze: Seq[LogicalPlan]
+ override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else
childrenToAnalyze
+ def markAsAnalyzed(): LogicalPlan
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 8b7f2db..bbc2b62 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -1022,7 +1022,18 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
- options: Map[String, String]) extends LeafCommand
+ options: Map[String, String],
+ isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
+ assert(!isAnalyzed)
+ copy(plan = newChildren.head)
+ }
+
+ override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
+
+ override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+}
/**
* The logical plan of the UNCACHE TABLE command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fd02d0b..540115b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3370,10 +3370,11 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
- child = logicalPlan,
+ plan = logicalPlan,
allowExisting = false,
replace = replace,
- viewType = viewType)
+ viewType = viewType,
+ isAnalyzed = true)
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index f9b9e5a..7f3d0c6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -474,7 +474,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec,
location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec,
location)
- case AlterViewAs(ResolvedView(ident, _), originalText, query) if
query.resolved =>
+ case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
@@ -482,7 +482,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
- originalText, child, allowExisting, replace, viewType) if child.resolved
=>
+ originalText, child, allowExisting, replace, viewType) =>
val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve
catalog in the name.
@@ -491,15 +491,15 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
- v1TableName.asTableIdentifier,
- userSpecifiedColumns,
- comment,
- properties,
- originalText,
- child,
- allowExisting,
- replace,
- viewType)
+ name = v1TableName.asTableIdentifier,
+ userSpecifiedColumns = userSpecifiedColumns,
+ comment = comment,
+ properties = properties,
+ originalText = originalText,
+ plan = child,
+ allowExisting = allowExisting,
+ replace = replace,
+ viewType = viewType)
case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
resolved match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 93ea226..10ec4be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -29,7 +29,7 @@ import
org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
SubqueryExpression, UserDefinedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand,
LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -48,13 +48,14 @@ import org.apache.spark.sql.util.SchemaUtils
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this
view is created via
* Dataset API.
- * @param child the logical plan that represents the view; this is used to
generate the logical
- * plan for temporary view and the view schema.
+ * @param plan the logical plan that represents the view; this is used to
generate the logical
+ * plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if
false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if
false, and if the view
* already exists, throws analysis exception.
* @param viewType the expected view type to be created with this command.
+ * @param isAnalyzed whether this command is analyzed or not.
*/
case class CreateViewCommand(
name: TableIdentifier,
@@ -62,15 +63,26 @@ case class CreateViewCommand(
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
- child: LogicalPlan,
+ plan: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
- viewType: ViewType)
- extends LeafRunnableCommand {
+ viewType: ViewType,
+ isAnalyzed: Boolean = false) extends RunnableCommand with
AnalysisOnlyCommand {
import ViewHelper._
- override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand = {
+ assert(!isAnalyzed)
+ copy(plan = newChildren.head)
+ }
+
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)
+
+ // `plan` needs to be analyzed, but shouldn't be optimized so that caching
works correctly.
+ override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil
+
+ def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create
permanent view")
@@ -96,10 +108,10 @@ case class CreateViewCommand(
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- // If the plan cannot be analyzed, throw an exception and don't proceed.
- val qe = sparkSession.sessionState.executePlan(child)
- qe.assertAnalyzed()
- val analyzedPlan = qe.analyzed
+ if (!isAnalyzed) {
+ throw new AnalysisException("The logical plan that represents the view
is not analyzed.")
+ }
+ val analyzedPlan = plan
if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
@@ -233,12 +245,23 @@ case class CreateViewCommand(
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
- query: LogicalPlan) extends LeafRunnableCommand {
+ query: LogicalPlan,
+ isAnalyzed: Boolean = false) extends RunnableCommand with
AnalysisOnlyCommand {
import ViewHelper._
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand = {
+ assert(!isAnalyzed)
+ copy(query = newChildren.head)
+ }
+
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+ override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil
+
+ def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
+
override def run(session: SparkSession): Seq[Row] = {
if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, query)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index 5b4b9e3..ac97e57 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -94,19 +94,19 @@ case class CacheTableAsSelectExec(
override lazy val relationName: String = tempViewName
override lazy val planToCache: LogicalPlan = {
- Dataset.ofRows(sparkSession,
- CreateViewCommand(
- name = TableIdentifier(tempViewName),
- userSpecifiedColumns = Nil,
- comment = None,
- properties = Map.empty,
- originalText = Some(originalText),
- child = query,
- allowExisting = false,
- replace = false,
- viewType = LocalTempView
- )
- )
+ CreateViewCommand(
+ name = TableIdentifier(tempViewName),
+ userSpecifiedColumns = Nil,
+ comment = None,
+ properties = Map.empty,
+ originalText = Some(originalText),
+ plan = query,
+ allowExisting = false,
+ replace = false,
+ viewType = LocalTempView,
+ isAnalyzed = true
+ ).run(sparkSession)
+
dataFrameForCachedPlan.logicalPlan
}
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index ddfab99..357445a 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -913,7 +913,7 @@ Execute CreateViewCommand (1)
Output: []
(2) CreateViewCommand
-Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1,
false, false, PersistedView
+Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1,
false, false, PersistedView, true
(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 1f7f8f6..3d00872 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -858,7 +858,7 @@ Execute CreateViewCommand (1)
Output: []
(2) CreateViewCommand
-Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1,
false, false, PersistedView
+Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1,
false, false, PersistedView, true
(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]