Repository: spark
Updated Branches:
  refs/heads/branch-2.3 a8e357ada -> 33ba8db8d


[SPARK-23523][SQL][BACKPORT-2.3] Fix the incorrect result caused by the rule 
OptimizeMetadataOnlyQuery

This PR is to backport https://github.com/apache/spark/pull/20684 and 
https://github.com/apache/spark/pull/20693 to Spark 2.3 branch

---

## What changes were proposed in this pull request?
```Scala
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
 Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
 .write.json(tablePath.getCanonicalPath)
 val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", 
"CoL3").distinct()
 df.show()
```

It generates a wrong result.
```
[c,e,a]
```

We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect the 
attribute order in the original leaf node. This PR is to fix it.

## How was this patch tested?
Added a test case

Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Author: gatorsmile <gatorsm...@gmail.com>

Closes #20763 from gatorsmile/backport23523.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ba8db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ba8db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ba8db8

Branch: refs/heads/branch-2.3
Commit: 33ba8db8d8c8bf388606a6f5e34b082469038205
Parents: a8e357a
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Mon Mar 12 21:47:38 2018 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Mar 12 21:47:38 2018 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/LocalRelation.scala  |  9 ++++----
 .../execution/OptimizeMetadataOnlyQuery.scala   | 12 +++++++++--
 .../datasources/HadoopFsRelation.scala          |  3 +++
 .../OptimizeMetadataOnlyQuerySuite.scala        | 22 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index d73d7e7..b05508d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -43,10 +43,11 @@ object LocalRelation {
   }
 }
 
-case class LocalRelation(output: Seq[Attribute],
-                         data: Seq[InternalRow] = Nil,
-                         // Indicates whether this relation has data from a 
streaming source.
-                         override val isStreaming: Boolean = false)
+case class LocalRelation(
+    output: Seq[Attribute],
+    data: Seq[InternalRow] = Nil,
+    // Indicates whether this relation has data from a streaming source.
+    override val isStreaming: Boolean = false)
   extends LeafNode with analysis.MultiInstanceRelation {
 
   // A local relation must have resolved output.

http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 18f6f69..dc4aff9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -80,8 +83,13 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
   private def getPartitionAttrs(
       partitionColumnNames: Seq[String],
       relation: LogicalPlan): Seq[Attribute] = {
-    val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
-    relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
+    val attrMap = relation.output.map(a => a.name.toLowerCase(Locale.ROOT) -> 
a).toMap
+    partitionColumnNames.map { colName =>
+      attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
+        throw new AnalysisException(s"Unable to find the column `$colName` " +
+          s"given [${relation.output.map(_.name).mkString(", ")}]")
+      )
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 6b34638..b2f73b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -67,6 +67,9 @@ case class HadoopFsRelation(
     }
   }
 
+  // When data and partition schemas have overlapping columns, the output
+  // schema respects the order of the data schema for the overlapping columns, 
and it
+  // respects the data types of the partition schema.
   val schema: StructType = {
     StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), 
f)) ++
       partitionSchema.filterNot(f => 
overlappedPartCols.contains(getColName(f))))

http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index 78c1e5d..a543eb8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.File
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
 import org.apache.spark.sql.test.SharedSQLContext
 
 class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
@@ -125,4 +128,23 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest 
with SharedSQLContext {
       sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
     }
   }
+
+  test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
+    withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+      withTempPath { path =>
+        val tablePath = new 
File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
+        Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", 
"cOl5")
+          .write.json(tablePath.getCanonicalPath)
+
+        val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", 
"CoL3").distinct()
+        checkAnswer(df, Row("a", "e", "c"))
+
+        val localRelation = df.queryExecution.optimizedPlan.collectFirst {
+          case l: LocalRelation => l
+        }
+        assert(localRelation.nonEmpty, "expect to see a LocalRelation")
+        assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", 
"cOl5"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to