Repository: spark
Updated Branches:
  refs/heads/branch-2.0 03d9af604 -> c9c36fa0c


[SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in 
DataFrameWriter

Some analyzer rules have assumptions on logical plans, optimizer may break 
these assumption, we should not pass an optimized query plan into 
QueryExecution (will be analyzed again), otherwise we may some weird bugs.

For example, we have a rule for decimal calculation to promote the precision 
before binary operations, use PromotePrecision as placeholder to indicate that 
this rule should not apply twice. But a Optimizer rule will remove this 
placeholder, that break the assumption, then the rule applied twice, cause 
wrong result.

Ideally, we should make all the analyzer rules all idempotent, that may require 
lots of effort to double checking them one by one (may be not easy).

An easier approach could be never feed a optimized plan into Analyzer, this PR 
fix the case for RunnableComand, they will be optimized, during execution, the 
passed `query` will also be passed into QueryExecution again. This PR make 
these `query` not part of the children, so they will not be optimized and 
analyzed again.

Right now, we did not know a logical plan is optimized or not, we could 
introduce a flag for that, and make sure a optimized logical plan will not be 
analyzed again.

Added regression tests.

Author: Davies Liu <dav...@databricks.com>

Closes #14797 from davies/fix_writer.

(cherry picked from commit ed9c884dcf925500ceb388b06b33bd2c95cd2ada)
Signed-off-by: Davies Liu <davies....@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9c36fa0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9c36fa0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9c36fa0

Branch: refs/heads/branch-2.0
Commit: c9c36fa0c7bccefde808bdbc32b04e8555356001
Parents: 03d9af6
Author: Davies Liu <dav...@databricks.com>
Authored: Fri Sep 2 15:10:12 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Fri Sep 2 15:12:38 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/command/commands.scala |  2 +-
 .../execution/command/createDataSourceTables.scala    |  2 +-
 .../spark/sql/execution/datasources/DataSource.scala  | 14 +++++++++++++-
 .../execution/datasources/DataSourceStrategy.scala    |  2 +-
 .../InsertIntoHadoopFsRelationCommand.scala           |  2 +-
 .../spark/sql/test/DataFrameReaderWriterSuite.scala   |  8 ++++++++
 .../execution/CreateHiveTableAsSelectCommand.scala    |  2 +-
 7 files changed, 26 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index cce1489..424a962 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
  */
 trait RunnableCommand extends LogicalPlan with logical.Command {
   override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
+  final override def children: Seq[LogicalPlan] = Seq.empty
   def run(sparkSession: SparkSession): Seq[Row]
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index c38eca5..900446c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -139,7 +139,7 @@ case class CreateDataSourceTableAsSelectCommand(
     query: LogicalPlan)
   extends RunnableCommand {
 
-  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // Since we are saving metadata to metastore, we need to check if 
metastore supports

http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f5727da..784fea5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -30,6 +30,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -484,13 +486,23 @@ case class DataSource(
           }
         }
 
+        // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
+        // not need to have the query as child, to avoid to analyze an 
optimized query,
+        // because InsertIntoHadoopFsRelationCommand will be optimized first.
+        val columns = partitionColumns.map { name =>
+          val plan = data.logicalPlan
+          plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
+            throw new AnalysisException(
+              s"Unable to resolve ${name} given 
[${plan.output.map(_.name).mkString(", ")}]")
+          }.asInstanceOf[Attribute]
+        }
         // For partitioned relation r, r.schema's column ordering can be 
different from the column
         // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
         // will be adjusted within InsertIntoHadoopFsRelation.
         val plan =
           InsertIntoHadoopFsRelationCommand(
             outputPath,
-            partitionColumns.map(UnresolvedAttribute.quoted),
+            columns,
             bucketSpec,
             format,
             () => Unit, // No existing table needs to be refreshed.

http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index bd65d02..6b4b3b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -187,7 +187,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
       InsertIntoHadoopFsRelationCommand(
         outputPath,
-        t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
+        query.resolve(t.partitionSchema, 
t.sparkSession.sessionState.analyzer.resolver),
         t.bucketSpec,
         t.fileFormat,
         () => t.refresh(),

http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index d8b8fae..518b02b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand(
     mode: SaveMode)
   extends RunnableCommand {
 
-  override def children: Seq[LogicalPlan] = query :: Nil
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow 
that

http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 27a0a2a..e071aef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -424,6 +424,14 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     spark.range(10).write.orc(dir)
   }
 
+  test("SPARK-17230: write out results of decimal calculation") {
+    val df = spark.range(99, 101)
+      .selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as 
num")
+    df.write.mode(SaveMode.Overwrite).parquet(dir)
+    val df2 = spark.read.parquet(dir)
+    checkAnswer(df2, df)
+  }
+
   private def testRead(
       df: => DataFrame,
       expectedResult: Seq[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/c9c36fa0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 3a8b0f1..a12b223 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand(
 
   private val tableIdentifier = tableDesc.identifier
 
-  override def children: Seq[LogicalPlan] = Seq(query)
+  override def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     lazy val metastoreRelation: MetastoreRelation = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to