This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new da84f81b1e2 [SPARK-44355][SQL] Move WithCTE into command queries
da84f81b1e2 is described below

commit da84f81b1e2ed9386cc5534141f53273ea4cb57d
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Jul 25 21:53:18 2023 +0800

    [SPARK-44355][SQL] Move WithCTE into command queries
    
    This PR is based on https://github.com/apache/spark/pull/42022 to fix 
tests, as the PR author is on vacation.
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to add new trait `CTEInChildren` and mix it to some 
commands that should have `WithCTE` on top of their children (queries) instead 
of main query. Also I modified  the `CTESubstitution` rule and removed special 
handling of `Command`s and similar nodes. After the changes, `Command`, 
`ParsedStatement` and `InsertIntoDir` are handled in the same way as other 
queries by referring to CTE Defs. Only the difference is in `WithCTE` is not 
not placed on the top of main quer [...]
    
    Closes #41922
    
    ### Why are the changes needed?
    To improve code maintenance. Right now the CTE resolution code path is 
diverged: query with commands go into CTE inline code path where non-command 
queries go into CTEDef code path.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running new test:
    ```
    $ build/sbt "test:testOnly *InsertSuite"
    ```
    
    Closes #42036 from cloud-fan/help.
    
    Lead-authored-by: Max Gekk <max.g...@gmail.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CTESubstitution.scala    |  85 ++++++++----
 .../plans/logical/basicLogicalOperators.scala      |  12 +-
 .../sql/catalyst/plans/logical/statements.scala    |   2 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |  23 +++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |   9 ++
 .../sql/execution/command/DataWritingCommand.scala |   4 +-
 .../command/InsertIntoDataSourceDirCommand.scala   |   8 +-
 .../execution/command/createDataSourceTables.scala |   8 +-
 .../spark/sql/execution/command/tables.scala       |   6 +-
 .../apache/spark/sql/execution/command/views.scala |  14 +-
 .../datasources/InsertIntoDataSourceCommand.scala  |   8 +-
 .../datasources/SaveIntoDataSourceCommand.scala    |   8 +-
 .../ansi/double-quoted-identifiers-enabled.sql.out |  14 +-
 .../sql-tests/analyzer-results/cte-command.sql.out | 152 +++++++++++++++++++++
 .../analyzer-results/postgreSQL/with.sql.out       |  12 +-
 .../resources/sql-tests/inputs/cte-command.sql     |  33 +++++
 .../sql-tests/results/cte-command.sql.out          | 121 ++++++++++++++++
 .../execution/CreateHiveTableAsSelectCommand.scala |   8 +-
 18 files changed, 473 insertions(+), 54 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 4e3234f9c0d..cd174f71892 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -20,18 +20,18 @@ package org.apache.spark.sql.catalyst.analysis
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
-import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, 
CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, 
UnresolvedWith, WithCTE}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, 
CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, 
SubqueryAlias, UnresolvedWith, WithCTE}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.util.TypeUtils._
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, 
LegacyBehaviorPolicy}
 
 /**
  * Analyze WITH nodes and substitute child plan with CTE references or CTE 
definitions depending
  * on the conditions below:
- * 1. If in legacy mode, or if the query is a SQL command or DML statement, 
replace with CTE
- *    definitions, i.e., inline CTEs.
+ * 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs.
  * 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision 
to inline or not
  *    inline will be made later by the rule `InlineCTE` after query analysis.
  *
@@ -46,42 +46,62 @@ import 
org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega
  * dependency for any valid CTE query (i.e., given CTE definitions A and B 
with B referencing A,
  * A is guaranteed to appear before B). Otherwise, it must be an invalid user 
query, and an
  * analysis exception will be thrown later by relation resolving rules.
+ *
+ * If the query is a SQL command or DML statement (extends `CTEInChildren`),
+ * place `WithCTE` into their children.
  */
 object CTESubstitution extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = {
     if (!plan.containsPattern(UNRESOLVED_WITH)) {
       return plan
     }
-    val isCommand = plan.exists {
-      case _: Command | _: ParsedStatement | _: InsertIntoDir => true
-      case _ => false
+
+    val commands = plan.collect {
+      case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c
     }
+    val forceInline = if (commands.length == 1) {
+      if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) {
+        // The legacy behavior always inlines the CTE relations for queries in 
commands.
+        true
+      } else {
+        // If there is only one command and it's `CTEInChildren`, we can 
resolve
+        // CTE normally and don't need to force inline.
+        !commands.head.isInstanceOf[CTEInChildren]
+      }
+    } else if (commands.length > 1) {
+      // This can happen with the multi-insert statement. We should fall back 
to
+      // the legacy behavior.
+      true
+    } else {
+      false
+    }
+
     val cteDefs = ArrayBuffer.empty[CTERelationDef]
     val (substituted, firstSubstituted) =
       
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match 
{
         case LegacyBehaviorPolicy.EXCEPTION =>
           assertNoNameConflictsInCTE(plan)
-          traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
+          traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
         case LegacyBehaviorPolicy.LEGACY =>
           (legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
         case LegacyBehaviorPolicy.CORRECTED =>
-          traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
+          traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
     }
     if (cteDefs.isEmpty) {
       substituted
     } else if (substituted eq firstSubstituted.get) {
-      WithCTE(substituted, cteDefs.toSeq)
+      withCTEDefs(substituted, cteDefs.toSeq)
     } else {
       var done = false
       substituted.resolveOperatorsWithPruning(_ => !done) {
         case p if p eq firstSubstituted.get =>
           // `firstSubstituted` is the parent of all other CTEs (if any).
           done = true
-          WithCTE(p, cteDefs.toSeq)
+          withCTEDefs(p, cteDefs.toSeq)
         case p if p.children.count(_.containsPattern(CTE)) > 1 =>
           // This is the first common parent of all CTEs.
           done = true
-          WithCTE(p, cteDefs.toSeq)
+          withCTEDefs(p, cteDefs.toSeq)
       }
     }
   }
@@ -131,7 +151,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
     plan.resolveOperatorsUp {
       case UnresolvedWith(child, relations) =>
         val resolvedCTERelations =
-          resolveCTERelations(relations, isLegacy = true, isCommand = false, 
Seq.empty, cteDefs)
+          resolveCTERelations(relations, isLegacy = true, forceInline = false, 
Seq.empty, cteDefs)
         substituteCTE(child, alwaysInline = true, resolvedCTERelations)
     }
   }
@@ -168,7 +188,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
    *     SELECT * FROM t
    *   )
    * @param plan the plan to be traversed
-   * @param isCommand if this is a command
+   * @param forceInline always inline the CTE relations if this is true
    * @param outerCTEDefs already resolved outer CTE definitions with names
    * @param cteDefs all accumulated CTE definitions
    * @return the plan where CTE substitution is applied and optionally the 
last substituted `With`
@@ -176,7 +196,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
    */
   private def traverseAndSubstituteCTE(
       plan: LogicalPlan,
-      isCommand: Boolean,
+      forceInline: Boolean,
       outerCTEDefs: Seq[(String, CTERelationDef)],
       cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, 
Option[LogicalPlan]) = {
     var firstSubstituted: Option[LogicalPlan] = None
@@ -184,11 +204,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
         _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
       case UnresolvedWith(child: LogicalPlan, relations) =>
         val resolvedCTERelations =
-          resolveCTERelations(relations, isLegacy = false, isCommand, 
outerCTEDefs, cteDefs) ++
+          resolveCTERelations(relations, isLegacy = false, forceInline, 
outerCTEDefs, cteDefs) ++
             outerCTEDefs
         val substituted = substituteCTE(
-          traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, 
cteDefs)._1,
-          isCommand,
+          traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, 
cteDefs)._1,
+          forceInline,
           resolvedCTERelations)
         if (firstSubstituted.isEmpty) {
           firstSubstituted = Some(substituted)
@@ -206,10 +226,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
   private def resolveCTERelations(
       relations: Seq[(String, SubqueryAlias)],
       isLegacy: Boolean,
-      isCommand: Boolean,
+      forceInline: Boolean,
       outerCTEDefs: Seq[(String, CTERelationDef)],
       cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
-    var resolvedCTERelations = if (isLegacy || isCommand) {
+    val alwaysInline = isLegacy || forceInline
+    var resolvedCTERelations = if (alwaysInline) {
       Seq.empty
     } else {
       outerCTEDefs
@@ -232,12 +253,12 @@ object CTESubstitution extends Rule[LogicalPlan] {
         //   WITH t3 AS (SELECT * FROM t1)
         // )
         // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
-        traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, 
cteDefs)._1
+        traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, 
cteDefs)._1
       }
       // CTE definition can reference a previous one
-      val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, 
resolvedCTERelations)
+      val substituted = substituteCTE(innerCTEResolved, alwaysInline, 
resolvedCTERelations)
       val cteRelation = CTERelationDef(substituted)
-      if (!(isLegacy || isCommand)) {
+      if (!alwaysInline) {
         cteDefs += cteRelation
       }
       // Prepending new CTEs makes sure that those have higher priority over 
outer ones.
@@ -249,7 +270,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
   private def substituteCTE(
       plan: LogicalPlan,
       alwaysInline: Boolean,
-      cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan =
+      cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
     plan.resolveOperatorsUpWithPruning(
         _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, 
PLAN_EXPRESSION)) {
       case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
@@ -273,4 +294,22 @@ object CTESubstitution extends Rule[LogicalPlan] {
             e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, 
cteRelations)))
         }
     }
+  }
+
+  /**
+   * For commands which extend `CTEInChildren`, we should place the `WithCTE` 
node on its
+   * children. There are two reasons:
+   *  1. Some rules will pattern match the root command nodes, and we should 
keep command
+   *     as the root node to not break them.
+   *  2. `Dataset` eagerly executes the commands inside a query plan. For 
example,
+   *     sql("WITH v ... CREATE TABLE t AS SELECT * FROM v") will create the 
table instead of just
+   *     analyzing the command. However, the CTE references inside commands 
will be invalid if we
+   *     execute the command alone, as the CTE definitions are outside of the 
command.
+   */
+  private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): 
LogicalPlan = {
+    p match {
+      case c: CTEInChildren => c.withCTEDefs(cteDefs)
+      case _ => WithCTE(p, cteDefs)
+    }
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 57e4b31dbe0..efb7dbb44ef 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -684,7 +684,7 @@ case class InsertIntoDir(
     provider: Option[String],
     child: LogicalPlan,
     overwrite: Boolean = true)
-  extends UnaryNode {
+  extends UnaryNode with CTEInChildren {
 
   override def output: Seq[Attribute] = Seq.empty
   override def metadataOutput: Seq[Attribute] = Nil
@@ -894,6 +894,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: 
Seq[CTERelationDef]) extends Logi
   }
 }
 
+/**
+ * The logical node which is able to place the `WithCTE` node on its children.
+ */
+trait CTEInChildren extends LogicalPlan {
+  def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    withNewChildren(children.map(WithCTE(_, cteDefs)))
+  }
+}
+
+
 case class WithWindowDefinition(
     windowDefinitions: Map[String, WindowSpecDefinition],
     child: LogicalPlan) extends UnaryNode {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 669750ee448..9efc3b13bc2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.DataType
  * Parsed logical plans are located in Catalyst so that as much SQL parsing 
logic as possible is be
  * kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]].
  */
-abstract class ParsedStatement extends LogicalPlan {
+abstract class ParsedStatement extends LogicalPlan with CTEInChildren {
   // Redact properties and options when parsed nodes are used by generic 
methods like toString
   override def productIterator: Iterator[Any] = super.productIterator.map {
     case mapArg: Map[_, _] => conf.redactOptions(mapArg)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 739ffa487e3..5c83da1a96a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command {
 /**
  * Base trait for DataSourceV2 write commands
  */
-trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery {
+trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with 
CTEInChildren {
   def table: NamedRelation
   def query: LogicalPlan
   def isByName: Boolean
@@ -392,9 +392,16 @@ case class WriteDelta(
   }
 }
 
-trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with 
AnalysisOnlyCommand {
+trait V2CreateTableAsSelectPlan
+  extends V2CreateTablePlan
+    with AnalysisOnlyCommand
+    with CTEInChildren {
   def query: LogicalPlan
 
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs))
+  }
+
   override lazy val resolved: Boolean = childrenResolved && {
     // the table schema is created from the query schema, so the only 
resolution needed is to check
     // that the columns referenced by the table's partitioning exist in the 
query schema
@@ -1234,12 +1241,16 @@ case class RepairTable(
 case class AlterViewAs(
     child: LogicalPlan,
     originalText: String,
-    query: LogicalPlan) extends BinaryCommand {
+    query: LogicalPlan) extends BinaryCommand with CTEInChildren {
   override def left: LogicalPlan = child
   override def right: LogicalPlan = query
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
     copy(child = newLeft, query = newRight)
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    withNewChildren(Seq(child, WithCTE(query, cteDefs)))
+  }
 }
 
 /**
@@ -1253,12 +1264,16 @@ case class CreateView(
     originalText: Option[String],
     query: LogicalPlan,
     allowExisting: Boolean,
-    replace: Boolean) extends BinaryCommand {
+    replace: Boolean) extends BinaryCommand with CTEInChildren {
   override def left: LogicalPlan = child
   override def right: LogicalPlan = query
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
     copy(child = newLeft, query = newRight)
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    withNewChildren(Seq(child, WithCTE(query, cteDefs)))
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 00bb6f77ef3..011dbd10f5c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3766,6 +3766,15 @@ object SQLConf {
     .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
     .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
 
+  val LEGACY_INLINE_CTE_IN_COMMANDS = 
buildConf("spark.sql.legacy.inlineCTEInCommands")
+    .internal()
+    .doc("If true, always inline the CTE relations for the queries in 
commands. This is the " +
+      "legacy behavior which may produce incorrect results because Spark may 
evaluate a CTE " +
+      "relation more than once, even if it's nondeterministic.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
   val LEGACY_TIME_PARSER_POLICY = 
buildConf("spark.sql.legacy.timeParserPolicy")
     .internal()
     .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and 
parsing " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 338ce8cac42..592ae04a055 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
LogicalPlan, UnaryCommand}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
@@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration
 /**
  * A special `Command` which writes data out and updates metrics.
  */
-trait DataWritingCommand extends UnaryCommand {
+trait DataWritingCommand extends UnaryCommand with CTEInChildren {
   /**
    * The input query plan that produces the data to be written.
    * IMPORTANT: the input query plan MUST be analyzed, so that we can carry 
its output columns
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
index 35c8bec3716..67d38b28c83 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
 
@@ -42,7 +42,7 @@ case class InsertIntoDataSourceDirCommand(
     storage: CatalogStorageFormat,
     provider: String,
     query: LogicalPlan,
-    overwrite: Boolean) extends LeafRunnableCommand {
+    overwrite: Boolean) extends LeafRunnableCommand with CTEInChildren {
 
   override def innerChildren: Seq[LogicalPlan] = query :: Nil
 
@@ -76,4 +76,8 @@ case class InsertIntoDataSourceDirCommand(
 
     Seq.empty[Row]
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(query = WithCTE(query, cteDefs))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3848d550515..54e8181c56c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -21,7 +21,7 @@ import java.net.URI
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
 import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.CommandExecutionMode
@@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
     mode: SaveMode,
     query: LogicalPlan,
     outputColumnNames: Seq[String])
-  extends LeafRunnableCommand {
+  extends LeafRunnableCommand with CTEInChildren {
   assert(query.resolved)
   override def innerChildren: Seq[LogicalPlan] = query :: Nil
 
@@ -233,4 +233,8 @@ case class CreateDataSourceTableAsSelectCommand(
         throw ex
     }
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(query = WithCTE(query, cteDefs))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 351f6d5456d..88e940ffdc7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -735,7 +735,7 @@ case class DescribeTableCommand(
  * 7. Common table expressions (CTEs)
  */
 case class DescribeQueryCommand(queryText: String, plan: LogicalPlan)
-  extends DescribeCommandBase {
+  extends DescribeCommandBase with CTEInChildren {
 
   override val output = DescribeCommandSchema.describeTableAttributes()
 
@@ -747,6 +747,10 @@ case class DescribeQueryCommand(queryText: String, plan: 
LogicalPlan)
     describeSchema(queryExecution.analyzed.schema, result, header = false)
     result.toSeq
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(plan = WithCTE(plan, cteDefs))
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 3718794ea59..8ac982b9bdd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, 
GlobalTempView, LocalTempView, ViewType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, TemporaryViewRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, 
LogicalPlan, Project, View}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, 
CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -69,7 +69,7 @@ case class CreateViewCommand(
     viewType: ViewType,
     isAnalyzed: Boolean = false,
     referredTempFunctions: Seq[String] = Seq.empty)
-  extends RunnableCommand with AnalysisOnlyCommand {
+  extends RunnableCommand with AnalysisOnlyCommand with CTEInChildren {
 
   import ViewHelper._
 
@@ -215,6 +215,10 @@ case class CreateViewCommand(
       comment = comment
     )
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(plan = WithCTE(plan, cteDefs))
+  }
 }
 
 /**
@@ -235,7 +239,7 @@ case class AlterViewAsCommand(
     query: LogicalPlan,
     isAnalyzed: Boolean = false,
     referredTempFunctions: Seq[String] = Seq.empty)
-  extends RunnableCommand with AnalysisOnlyCommand {
+  extends RunnableCommand with AnalysisOnlyCommand with CTEInChildren {
 
   import ViewHelper._
 
@@ -307,6 +311,10 @@ case class AlterViewAsCommand(
 
     session.sessionState.catalog.alterTable(updatedViewMeta)
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(query = WithCTE(query, cteDefs))
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 789b1d714fc..128f6acdeaa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.sources.InsertableRelation
 
@@ -31,7 +31,7 @@ case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
     overwrite: Boolean)
-  extends LeafRunnableCommand {
+  extends LeafRunnableCommand with CTEInChildren {
 
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
@@ -47,4 +47,8 @@ case class InsertIntoDataSourceCommand(
 
     Seq.empty[Row]
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(query = WithCTE(query, cteDefs))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 666ae9b5c6f..5423232db42 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.sources.CreatableRelationProvider
@@ -39,7 +39,7 @@ case class SaveIntoDataSourceCommand(
     query: LogicalPlan,
     dataSource: CreatableRelationProvider,
     options: Map[String, String],
-    mode: SaveMode) extends LeafRunnableCommand {
+    mode: SaveMode) extends LeafRunnableCommand with CTEInChildren {
 
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
@@ -68,4 +68,8 @@ case class SaveIntoDataSourceCommand(
   override def clone(): LogicalPlan = {
     SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode)
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(query = WithCTE(query, cteDefs))
+  }
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out
index 2a6bcce99d1..a155bccd091 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out
@@ -410,11 +410,15 @@ CREATE TEMPORARY VIEW "myview"("c1") AS
   WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v"
 -- !query analysis
 CreateViewCommand `myview`, [(c1,None)], WITH "v"("a") AS (SELECT 1) SELECT 
"a" FROM "v", false, false, LocalTempView, true
-   +- Project [a#x]
-      +- SubqueryAlias v
-         +- Project [1#x AS a#x]
-            +- Project [1 AS 1#x]
-               +- OneRowRelation
+   +- WithCTE
+      :- CTERelationDef xxxx, false
+      :  +- SubqueryAlias v
+      :     +- Project [1#x AS a#x]
+      :        +- Project [1 AS 1#x]
+      :           +- OneRowRelation
+      +- Project [a#x]
+         +- SubqueryAlias v
+            +- CTERelationRef xxxx, true, [a#x]
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out
new file mode 100644
index 00000000000..93dab5aaa6d
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out
@@ -0,0 +1,152 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
+-- !query analysis
+CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, 
ErrorIfExists, [col]
+   +- WithCTE
+      :- CTERelationDef xxxx, false
+      :  +- SubqueryAlias s
+      :     +- Project [42 AS col#x]
+      :        +- OneRowRelation
+      +- Project [col#x]
+         +- SubqueryAlias s
+            +- CTERelationRef xxxx, true, [col#x]
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query analysis
+Project [col#x]
++- SubqueryAlias spark_catalog.default.cte_tbl
+   +- Relation spark_catalog.default.cte_tbl[col#x] csv
+
+
+-- !query
+CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
+-- !query analysis
+CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, 
false, false, LocalTempView, true
+   +- WithCTE
+      :- CTERelationDef xxxx, false
+      :  +- SubqueryAlias s
+      :     +- Project [42 AS col#x]
+      :        +- OneRowRelation
+      +- Project [col#x]
+         +- SubqueryAlias s
+            +- CTERelationRef xxxx, true, [col#x]
+
+
+-- !query
+SELECT * FROM cte_view
+-- !query analysis
+Project [col#x]
++- SubqueryAlias cte_view
+   +- View (`cte_view`, [col#x])
+      +- Project [cast(col#x as int) AS col#x]
+         +- WithCTE
+            :- CTERelationDef xxxx, false
+            :  +- SubqueryAlias s
+            :     +- Project [42 AS col#x]
+            :        +- OneRowRelation
+            +- Project [col#x]
+               +- SubqueryAlias s
+                  +- CTERelationRef xxxx, true, [col#x]
+
+
+-- !query
+WITH s AS (SELECT 43 AS col)
+INSERT INTO cte_tbl SELECT * FROM S
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/cte_tbl], Append, 
`spark_catalog`.`default`.`cte_tbl`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/cte_tbl), [col]
++- WithCTE
+   :- CTERelationDef xxxx, false
+   :  +- SubqueryAlias s
+   :     +- Project [43 AS col#x]
+   :        +- OneRowRelation
+   +- Project [col#x]
+      +- SubqueryAlias S
+         +- CTERelationRef xxxx, true, [col#x]
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query analysis
+Project [col#x]
++- SubqueryAlias spark_catalog.default.cte_tbl
+   +- Relation spark_catalog.default.cte_tbl[col#x] csv
+
+
+-- !query
+INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
+-- !query analysis
+InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/cte_tbl], Append, 
`spark_catalog`.`default`.`cte_tbl`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/cte_tbl), [col]
++- WithCTE
+   :- CTERelationDef xxxx, false
+   :  +- SubqueryAlias s
+   :     +- Project [44 AS col#x]
+   :        +- OneRowRelation
+   +- Project [col#x]
+      +- SubqueryAlias s
+         +- CTERelationRef xxxx, true, [col#x]
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query analysis
+Project [col#x]
++- SubqueryAlias spark_catalog.default.cte_tbl
+   +- Relation spark_catalog.default.cte_tbl[col#x] csv
+
+
+-- !query
+CREATE TABLE cte_tbl2 (col INT) USING csv
+-- !query analysis
+CreateDataSourceTableCommand `spark_catalog`.`default`.`cte_tbl2`, false
+
+
+-- !query
+WITH s AS (SELECT 45 AS col)
+FROM s
+INSERT INTO cte_tbl SELECT col
+INSERT INTO cte_tbl2 SELECT col
+-- !query analysis
+Union false, false
+:- InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/cte_tbl], Append, 
`spark_catalog`.`default`.`cte_tbl`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/cte_tbl), [col]
+:  +- Project [col#x]
+:     +- SubqueryAlias s
+:        +- Project [45 AS col#x]
+:           +- OneRowRelation
++- InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in 
comparison]/{warehouse_dir}/cte_tbl2], Append, 
`spark_catalog`.`default`.`cte_tbl2`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/cte_tbl2), [col]
+   +- Project [col#x]
+      +- SubqueryAlias s
+         +- Project [45 AS col#x]
+            +- OneRowRelation
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query analysis
+Project [col#x]
++- SubqueryAlias spark_catalog.default.cte_tbl
+   +- Relation spark_catalog.default.cte_tbl[col#x] csv
+
+
+-- !query
+SELECT * FROM cte_tbl2
+-- !query analysis
+Project [col#x]
++- SubqueryAlias spark_catalog.default.cte_tbl2
+   +- Relation spark_catalog.default.cte_tbl2[col#x] csv
+
+
+-- !query
+DROP TABLE cte_tbl
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl
+
+
+-- !query
+DROP TABLE cte_tbl2
+-- !query analysis
+DropTable false, false
++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl2
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out
index e53480e96be..f58f8faa0be 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out
@@ -452,10 +452,14 @@ with test as (select 42) insert into test select * from 
test
 -- !query analysis
 InsertIntoHadoopFsRelationCommand file:[not included in 
comparison]/{warehouse_dir}/test, false, Parquet, [path=file:[not included in 
comparison]/{warehouse_dir}/test], Append, `spark_catalog`.`default`.`test`, 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included 
in comparison]/{warehouse_dir}/test), [i]
 +- Project [cast(42#x as int) AS i#x]
-   +- Project [42#x]
-      +- SubqueryAlias test
-         +- Project [42 AS 42#x]
-            +- OneRowRelation
+   +- WithCTE
+      :- CTERelationDef xxxx, false
+      :  +- SubqueryAlias test
+      :     +- Project [42 AS 42#x]
+      :        +- OneRowRelation
+      +- Project [42#x]
+         +- SubqueryAlias test
+            +- CTERelationRef xxxx, true, [42#x]
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql 
b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql
new file mode 100644
index 00000000000..ee90c2de49e
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql
@@ -0,0 +1,33 @@
+-- WITH inside CTE
+CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s;
+
+SELECT * FROM cte_tbl;
+
+-- WITH inside CREATE VIEW
+CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s;
+
+SELECT * FROM cte_view;
+
+-- INSERT inside WITH
+WITH s AS (SELECT 43 AS col)
+INSERT INTO cte_tbl SELECT * FROM S;
+
+SELECT * FROM cte_tbl;
+
+-- WITH inside INSERT
+INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s;
+
+SELECT * FROM cte_tbl;
+
+CREATE TABLE cte_tbl2 (col INT) USING csv;
+-- Multi-INSERT
+WITH s AS (SELECT 45 AS col)
+FROM s
+INSERT INTO cte_tbl SELECT col
+INSERT INTO cte_tbl2 SELECT col;
+
+SELECT * FROM cte_tbl;
+SELECT * FROM cte_tbl2;
+
+DROP TABLE cte_tbl;
+DROP TABLE cte_tbl2;
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out 
b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out
new file mode 100644
index 00000000000..67ac321a195
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out
@@ -0,0 +1,121 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query schema
+struct<col:int>
+-- !query output
+42
+
+
+-- !query
+CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM cte_view
+-- !query schema
+struct<col:int>
+-- !query output
+42
+
+
+-- !query
+WITH s AS (SELECT 43 AS col)
+INSERT INTO cte_tbl SELECT * FROM S
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query schema
+struct<col:int>
+-- !query output
+42
+43
+
+
+-- !query
+INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query schema
+struct<col:int>
+-- !query output
+42
+43
+44
+
+
+-- !query
+CREATE TABLE cte_tbl2 (col INT) USING csv
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+WITH s AS (SELECT 45 AS col)
+FROM s
+INSERT INTO cte_tbl SELECT col
+INSERT INTO cte_tbl2 SELECT col
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM cte_tbl
+-- !query schema
+struct<col:int>
+-- !query output
+42
+43
+44
+45
+
+
+-- !query
+SELECT * FROM cte_tbl2
+-- !query schema
+struct<col:int>
+-- !query output
+45
+
+
+-- !query
+DROP TABLE cte_tbl
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DROP TABLE cte_tbl2
+-- !query schema
+struct<>
+-- !query output
+
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 4127e7c75d7..eef2ae1f737 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, 
CTERelationDef, LogicalPlan, WithCTE}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.{DataWritingCommand, 
LeafRunnableCommand}
@@ -38,7 +38,7 @@ case class CreateHiveTableAsSelectCommand(
     query: LogicalPlan,
     outputColumnNames: Seq[String],
     mode: SaveMode)
-  extends LeafRunnableCommand {
+  extends LeafRunnableCommand with CTEInChildren {
   assert(query.resolved)
   override def innerChildren: Seq[LogicalPlan] = query :: Nil
 
@@ -111,4 +111,8 @@ case class CreateHiveTableAsSelectCommand(
     s"[Database: ${tableDesc.database}, " +
       s"TableName: ${tableDesc.identifier.table}]"
   }
+
+  override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
+    copy(query = WithCTE(query, cteDefs))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to