Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ca66f52ff -> d1a021178


[SPARK-17892][SQL][2.0] Do Not Optimize Query in CTAS More Than Once #15048

### What changes were proposed in this pull request?
This PR is to backport https://github.com/apache/spark/pull/15048 and 
https://github.com/apache/spark/pull/15459.

However, in 2.0, we do not have a unified logical node `CreateTable` and the 
analyzer rule `PreWriteCheck` is also different. To minimize the code changes, 
this PR adds a new rule `AnalyzeCreateTableAsSelect`. Please treat it as a new 
PR to review. Thanks!

As explained in https://github.com/apache/spark/pull/14797:
>Some analyzer rules have assumptions on logical plans, optimizer may break 
>these assumption, we should not pass an optimized query plan into 
>QueryExecution (will be analyzed again), otherwise we may some weird bugs.
For example, we have a rule for decimal calculation to promote the precision 
before binary operations, use PromotePrecision as placeholder to indicate that 
this rule should not apply twice. But a Optimizer rule will remove this 
placeholder, that break the assumption, then the rule applied twice, cause 
wrong result.

We should not optimize the query in CTAS more than once. For example,
```Scala
spark.range(99, 101).createOrReplaceTempView("tab1")
val sqlStmt = "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as 
num FROM tab1"
sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
checkAnswer(spark.table("tab2"), sql(sqlStmt))
```
Before this PR, the results do not match
```
== Results ==
!== Correct Answer - 2 ==       == Spark Answer - 2 ==
![100,100.000000000000000000]   [100,null]
 [99,99.000000000000000000]     [99,99.000000000000000000]
```
After this PR, the results match.
```
+---+----------------------+
|id |num                   |
+---+----------------------+
|99 |99.000000000000000000 |
|100|100.000000000000000000|
+---+----------------------+
```

In this PR, we do not treat the `query` in CTAS as a child. Thus, the `query` 
will not be optimized when optimizing CTAS statement. However, we still need to 
analyze it for normalizing and verifying the CTAS in the Analyzer. Thus, we do 
it in the analyzer rule `PreprocessDDL`, because so far only this rule needs 
the analyzed plan of the `query`.

### How was this patch tested?

Author: gatorsmile <gatorsm...@gmail.com>

Closes #15502 from gatorsmile/ctasOptimize2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1a02117
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1a02117
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1a02117

Branch: refs/heads/branch-2.0
Commit: d1a02117862b20d0e8e58f4c6da6a97665a02590
Parents: ca66f52
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Mon Oct 17 15:29:53 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Oct 17 15:29:53 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/plans/logical/Command.scala    |  7 +++++-
 .../analysis/UnsupportedOperationsSuite.scala   |  5 +---
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../sql/execution/command/SetCommand.scala      |  2 --
 .../spark/sql/execution/command/cache.scala     |  6 -----
 .../spark/sql/execution/command/commands.scala  |  4 +--
 .../spark/sql/execution/command/databases.scala |  2 --
 .../spark/sql/execution/command/ddl.scala       |  6 -----
 .../spark/sql/execution/command/tables.scala    | 11 +++++----
 .../spark/sql/execution/datasources/ddl.scala   | 17 +++++--------
 .../spark/sql/execution/datasources/rules.scala | 26 +++++++++++++++++---
 .../spark/sql/internal/SessionState.scala       |  3 ++-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 20 +++++++++++++++
 .../sql/sources/CreateTableAsSelectSuite.scala  | 12 +++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 ++---
 .../spark/sql/hive/HiveSessionState.scala       |  1 +
 .../spark/sql/hive/MetastoreRelationSuite.scala | 22 +++++++++++++++--
 17 files changed, 102 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index 75a5b10..64f5783 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -17,9 +17,14 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
 /**
  * A logical node that represents a non-query command to be executed by the 
system.  For example,
  * commands can be used by parsers to represent DDL operations.  Commands, 
unlike queries, are
  * eagerly executed.
  */
-trait Command
+trait Command extends LeafNode {
+  final override def children: Seq[LogicalPlan] = Seq.empty
+  override def output: Seq[Attribute] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 6df47ac..ff1bb12 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -31,10 +31,7 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.IntegerType
 
 /** A dummy command for testing unsupported operations. */
-case class DummyCommand() extends LogicalPlan with Command {
-  override def output: Seq[Attribute] = Nil
-  override def children: Seq[LogicalPlan] = Nil
-}
+case class DummyCommand() extends Command
 
 class UnsupportedOperationsSuite extends SparkFunSuite {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index e7faab5..ccfb954 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -449,7 +449,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
             c.bucketSpec,
             c.mode,
             c.options,
-            c.child)
+            c.query)
         ExecutedCommandExec(cmd) :: Nil
 
       case c: CreateTempViewUsing =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index b0e2d03..af6def5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -129,6 +129,4 @@ case object ResetCommand extends RunnableCommand with 
Logging {
     sparkSession.sessionState.conf.clear()
     Seq.empty[Row]
   }
-
-  override val output: Seq[Attribute] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 697e2ff..605e49c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -47,8 +47,6 @@ case class CacheTableCommand(
 
     Seq.empty[Row]
   }
-
-  override def output: Seq[Attribute] = Seq.empty
 }
 
 
@@ -58,8 +56,6 @@ case class UncacheTableCommand(tableIdent: TableIdentifier) 
extends RunnableComm
     sparkSession.catalog.uncacheTable(tableIdent.quotedString)
     Seq.empty[Row]
   }
-
-  override def output: Seq[Attribute] = Seq.empty
 }
 
 /**
@@ -71,6 +67,4 @@ case object ClearCacheCommand extends RunnableCommand {
     sparkSession.catalog.clearCache()
     Seq.empty[Row]
   }
-
-  override def output: Seq[Attribute] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 424a962..698c625 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -35,9 +35,7 @@ import org.apache.spark.sql.types._
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
-trait RunnableCommand extends LogicalPlan with logical.Command {
-  override def output: Seq[Attribute] = Seq.empty
-  final override def children: Seq[LogicalPlan] = Seq.empty
+trait RunnableCommand extends logical.Command {
   def run(sparkSession: SparkSession): Seq[Row]
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
index 597ec27..e5a6a5f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala
@@ -59,6 +59,4 @@ case class SetDatabaseCommand(databaseName: String) extends 
RunnableCommand {
     sparkSession.sessionState.catalog.setCurrentDatabase(databaseName)
     Seq.empty[Row]
   }
-
-  override val output: Seq[Attribute] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 16deee3..d63d29d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -72,8 +72,6 @@ case class CreateDatabaseCommand(
       ifNotExists)
     Seq.empty[Row]
   }
-
-  override val output: Seq[Attribute] = Seq.empty
 }
 
 
@@ -103,8 +101,6 @@ case class DropDatabaseCommand(
     sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, 
cascade)
     Seq.empty[Row]
   }
-
-  override val output: Seq[Attribute] = Seq.empty
 }
 
 /**
@@ -128,8 +124,6 @@ case class AlterDatabasePropertiesCommand(
 
     Seq.empty[Row]
   }
-
-  override val output: Seq[Attribute] = Seq.empty
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 9253db0..ad0c779 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -34,7 +34,8 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
UnaryNode}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -43,10 +44,10 @@ import org.apache.spark.util.Utils
 
 case class CreateHiveTableAsSelectLogicalPlan(
     tableDesc: CatalogTable,
-    child: LogicalPlan,
-    allowExisting: Boolean) extends UnaryNode with Command {
+    query: LogicalPlan,
+    allowExisting: Boolean) extends Command {
 
-  override def output: Seq[Attribute] = Seq.empty[Attribute]
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override lazy val resolved: Boolean =
     tableDesc.identifier.database.isDefined &&
@@ -54,7 +55,7 @@ case class CreateHiveTableAsSelectLogicalPlan(
       tableDesc.storage.serde.isDefined &&
       tableDesc.storage.inputFormat.isDefined &&
       tableDesc.storage.outputFormat.isDefined &&
-      childrenResolved
+      query.resolved
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31a2075..857047f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
@@ -41,17 +41,10 @@ case class CreateTableUsing(
     partitionColumns: Array[String],
     bucketSpec: Option[BucketSpec],
     allowExisting: Boolean,
-    managedIfNoPath: Boolean) extends LogicalPlan with logical.Command {
-
-  override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
-}
+    managedIfNoPath: Boolean) extends logical.Command
 
 /**
  * A node used to support CTAS statements and saveAsTable for the data source 
API.
- * This node is a [[logical.UnaryNode]] instead of a [[logical.Command]] 
because we want the
- * analyzer can analyze the logical plan that will be used to populate the 
table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
  */
 case class CreateTableUsingAsSelect(
     tableIdent: TableIdentifier,
@@ -60,8 +53,10 @@ case class CreateTableUsingAsSelect(
     bucketSpec: Option[BucketSpec],
     mode: SaveMode,
     options: Map[String, String],
-    child: LogicalPlan) extends logical.UnaryNode {
-  override def output: Seq[Attribute] = Seq.empty[Attribute]
+    query: LogicalPlan) extends logical.Command {
+
+  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  override lazy val resolved: Boolean = query.resolved
 }
 
 case class CreateTempViewUsing(

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 05908d9..27420d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, Cast, RowOrd
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import 
org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 
@@ -62,6 +63,25 @@ class ResolveDataSource(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
 }
 
 /**
+ * Analyze the query in CREATE TABLE AS SELECT (CTAS). After analysis, 
[[PreWriteCheck]] also
+ * can detect the cases that are not allowed.
+ */
+case class AnalyzeCreateTableAsSelect(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case c: CreateTableUsingAsSelect if !c.query.resolved =>
+      c.copy(query = analyzeQuery(c.query))
+    case c: CreateHiveTableAsSelectLogicalPlan if !c.query.resolved =>
+      c.copy(query = analyzeQuery(c.query))
+  }
+
+  private def analyzeQuery(query: LogicalPlan): LogicalPlan = {
+    val qe = sparkSession.sessionState.executePlan(query)
+    qe.assertAnalyzed()
+    qe.analyzed
+  }
+}
+
+/**
  * Preprocess the [[InsertIntoTable]] plan. Throws exception if the number of 
columns mismatch, or
  * specified partition columns are different from the existing partition 
columns in the target
  * table. It also does data type casting and field renaming, to make sure that 
the columns to be
@@ -216,7 +236,7 @@ case class PreWriteCheck(conf: SQLConf, catalog: 
SessionCatalog)
             // (the relation is a BaseRelation).
             case l @ LogicalRelation(dest: BaseRelation, _, _) =>
               // Get all input data source relations of the query.
-              val srcRelations = c.child.collect {
+              val srcRelations = c.query.collect {
                 case LogicalRelation(src: BaseRelation, _, _) => src
               }
               if (srcRelations.contains(dest)) {
@@ -233,12 +253,12 @@ case class PreWriteCheck(conf: SQLConf, catalog: 
SessionCatalog)
         }
 
         PartitioningUtils.validatePartitionColumn(
-          c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
+          c.query.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
 
         for {
           spec <- c.bucketSpec
           sortColumnName <- spec.sortColumnNames
-          sortColumn <- c.child.schema.find(_.name == sortColumnName)
+          sortColumn <- c.query.schema.find(_.name == sortColumnName)
         } {
           if (!RowOrdering.isOrderable(sortColumn.dataType)) {
             failAnalysis(s"Cannot use ${sortColumn.dataType.simpleString} for 
sorting column.")

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 01cc13f..e054ef2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.AnalyzeTableCommand
-import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, 
FindDataSourceTable, PreprocessTableInsertion, ResolveDataSource}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -111,6 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) 
{
   lazy val analyzer: Analyzer = {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
+        AnalyzeCreateTableAsSelect(sparkSession) ::
         PreprocessTableInsertion(conf) ::
         new FindDataSourceTable(sparkSession) ::
         DataSourceAnalysis(conf) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index da5c538..7ab0fe0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -27,9 +27,11 @@ import scala.util.Random
 import org.scalatest.Matchers._
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchange}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -1565,4 +1567,22 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
     val d = sampleDf.withColumn("c", 
monotonically_increasing_id).select($"c").collect
     assert(d.size == d.distinct.size)
   }
+
+  test("SPARK-17409: Do Not Optimize Query in CTAS (Data source tables) More 
Than Once") {
+    withTable("bar") {
+      withTempView("foo") {
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "json") {
+          sql("select 0 as id").createOrReplaceTempView("foo")
+          val df = sql("select * from foo group by id")
+          // If we optimize the query in CTAS more than once, the following 
saveAsTable will fail
+          // with the error: `GROUP BY position 0 is not in select list (valid 
range is [1, 1])`
+          df.write.mode("overwrite").saveAsTable("bar")
+          checkAnswer(spark.table("bar"), Row(0) :: Nil)
+          val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar"))
+          assert(tableMetadata.properties(DATASOURCE_PROVIDER) == "json",
+            "the expected table is a data source table using json")
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 251a256..ea1f7a5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -220,4 +220,16 @@ class CreateTableAsSelectSuite
         Some(BucketSpec(5, Seq("a"), Seq("b"))))
     }
   }
+
+  test("SPARK-17409: CTAS of decimal calculation") {
+    withTable("tab2") {
+      withTempView("tab1") {
+        spark.range(99, 101).createOrReplaceTempView("tab1")
+        val sqlStmt =
+          "SELECT id, cast(id as long) * cast('1.0' as decimal(38, 18)) as num 
FROM tab1"
+        sql(s"CREATE TABLE tab2 USING PARQUET AS $sqlStmt")
+        checkAnswer(spark.table("tab2"), sql(sqlStmt))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bafb422..e7d1ed3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -449,7 +449,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       case p: LogicalPlan if !p.childrenResolved => p
       case p: LogicalPlan if p.resolved => p
 
-      case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) 
=>
+      case p @ CreateHiveTableAsSelectLogicalPlan(table, query, allowExisting) 
=>
         val desc = if (table.storage.serde.isEmpty) {
           // add default serde
           table.withNewStorage(
@@ -462,7 +462,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
         execution.CreateHiveTableAsSelectCommand(
           desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-          child,
+          query,
           allowExisting)
     }
   }
@@ -510,7 +510,7 @@ private[hive] case class InsertIntoHiveTable(
     child: LogicalPlan,
     overwrite: Boolean,
     ifNotExists: Boolean)
-  extends LogicalPlan with Command {
+  extends LogicalPlan {
 
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = Seq.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 8773993..822a770 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -62,6 +62,7 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
   override lazy val analyzer: Analyzer = {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
+        AnalyzeCreateTableAsSelect(sparkSession) ::
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
         catalog.CreateTables ::

http://git-wip-us.apache.org/repos/asf/spark/blob/d1a02117/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
index eec60b4..c6711c3 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, 
CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
 
-class MetastoreRelationSuite extends SparkFunSuite {
+class MetastoreRelationSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
   test("makeCopy and toJSON should work") {
     val table = CatalogTable(
       identifier = TableIdentifier("test", Some("db")),
@@ -35,4 +38,19 @@ class MetastoreRelationSuite extends SparkFunSuite {
     // No exception should be thrown
     relation.toJSON
   }
+
+  test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More 
Than Once") {
+    withTable("bar") {
+      withTempView("foo") {
+        sql("select 0 as id").createOrReplaceTempView("foo")
+        // If we optimize the query in CTAS more than once, the following 
saveAsTable will fail
+        // with the error: `GROUP BY position 0 is not in select list (valid 
range is [1, 1])`
+        sql("CREATE TABLE bar AS SELECT * FROM foo group by id")
+        checkAnswer(spark.table("bar"), Row(0) :: Nil)
+        val tableMetadata = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar"))
+        assert(!DDLUtils.isDatasourceTable(tableMetadata),
+          "the expected table is a Hive serde table")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to