Repository: spark
Updated Branches:
  refs/heads/master 3180272d2 -> 57e97fcbd


[SPARK-18029][SQL] PruneFileSourcePartitions should not change the output of 
LogicalRelation

## What changes were proposed in this pull request?

In `PruneFileSourcePartitions`, we will replace the `LogicalRelation` with a 
pruned one. However, this replacement may change the output of the 
`LogicalRelation` if it doesn't have `expectedOutputAttributes`. This PR fixes 
it.

## How was this patch tested?

the new `PruneFileSourcePartitionsSuite`

Author: Wenchen Fan <[email protected]>

Closes #15569 from cloud-fan/partition-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57e97fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57e97fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57e97fcb

Branch: refs/heads/master
Commit: 57e97fcbd6fe62af4acd60896feeacfa21efc222
Parents: 3180272
Author: Wenchen Fan <[email protected]>
Authored: Fri Oct 21 12:27:53 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Oct 21 12:27:53 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  4 +-
 .../datasources/PruneFileSourcePartitions.scala |  4 +-
 .../spark/sql/hive/HiveDataFrameSuite.scala     |  7 +-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala |  3 +-
 .../PruneFileSourcePartitionsSuite.scala        | 74 ++++++++++++++++++++
 5 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 1a57a77..a97ed70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -102,8 +102,8 @@ case class CatalogTablePartition(
    * Given the partition schema, returns a row with that schema holding the 
partition values.
    */
   def toRow(partitionSchema: StructType): InternalRow = {
-    InternalRow.fromSeq(partitionSchema.map { case StructField(name, dataType, 
_, _) =>
-      Cast(Literal(spec(name)), dataType).eval()
+    InternalRow.fromSeq(partitionSchema.map { field =>
+      Cast(Literal(spec(field.name)), field.dataType).eval()
     })
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 29121a4..8689017 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,7 +59,9 @@ private[sql] object PruneFileSourcePartitions extends 
Rule[LogicalPlan] {
         val prunedFileCatalog = 
tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
         val prunedFsRelation =
           fsRelation.copy(location = prunedFileCatalog)(sparkSession)
-        val prunedLogicalRelation = logicalRelation.copy(relation = 
prunedFsRelation)
+        val prunedLogicalRelation = logicalRelation.copy(
+          relation = prunedFsRelation,
+          expectedOutputAttributes = Some(logicalRelation.output))
 
         // Keep partition-pruning predicates so that they are visible in 
physical planning
         val filterExpression = filters.reduceLeft(And)

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
index f65e74d..1552343 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.QueryTest
 
 class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
   test("table name with schema") {
@@ -78,7 +79,7 @@ class HiveDataFrameSuite extends QueryTest with 
TestHiveSingleton with SQLTestUt
   }
 
   test("lazy partition pruning reads only necessary partition data") {
-    withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "true") {
+    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
       withTable("test") {
         withTempDir { dir =>
           setupPartitionedTable("test", dir)
@@ -114,7 +115,7 @@ class HiveDataFrameSuite extends QueryTest with 
TestHiveSingleton with SQLTestUt
   }
 
   test("all partitions read and cached when filesource partition pruning is 
off") {
-    withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> "false") {
+    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
       withTable("test") {
         withTempDir { dir =>
           setupPartitionedTable("test", dir)

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 2ca1cd4..d290fe9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
 /**
@@ -62,7 +63,7 @@ class HiveMetadataCacheSuite extends QueryTest with 
SQLTestUtils with TestHiveSi
 
   def testCaching(pruningEnabled: Boolean): Unit = {
     test(s"partitioned table is cached when partition pruning is 
$pruningEnabled") {
-      withSQLConf("spark.sql.hive.filesourcePartitionPruning" -> 
pruningEnabled.toString) {
+      withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> 
pruningEnabled.toString) {
         withTable("test") {
           withTempDir { dir =>
             spark.range(5).selectExpr("id", "id as f1", "id as f2").write

http://git-wip-us.apache.org/repos/asf/spark/blob/57e97fcb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
new file mode 100644
index 0000000..346ea0c
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
+
+class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("PruneFileSourcePartitions", Once, 
PruneFileSourcePartitions) :: Nil
+  }
+
+  test("PruneFileSourcePartitions should not change the output of 
LogicalRelation") {
+    withTable("test") {
+      withTempDir { dir =>
+        sql(
+          s"""
+            |CREATE EXTERNAL TABLE test(i int)
+            |PARTITIONED BY (p int)
+            |STORED AS parquet
+            |LOCATION '${dir.getAbsolutePath}'""".stripMargin)
+
+        val tableMeta = spark.sharedState.externalCatalog.getTable("default", 
"test")
+        val tableFileCatalog = new TableFileCatalog(
+          spark,
+          tableMeta.database,
+          tableMeta.identifier.table,
+          Some(tableMeta.partitionSchema),
+          0)
+
+        val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+          tableMeta.partitionColumnNames.contains(f.name)
+        })
+        val relation = HadoopFsRelation(
+          location = tableFileCatalog,
+          partitionSchema = tableMeta.partitionSchema,
+          dataSchema = dataSchema,
+          bucketSpec = None,
+          fileFormat = new ParquetFileFormat(),
+          options = Map.empty)(sparkSession = spark)
+
+        val logicalRelation = LogicalRelation(relation, catalogTable = 
Some(tableMeta))
+        val query = Project(Seq('i, 'p), Filter('p === 1, 
logicalRelation)).analyze
+
+        val optimized = Optimize.execute(query)
+        assert(optimized.missingInput.isEmpty)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to