Repository: spark
Updated Branches:
  refs/heads/branch-1.4 b97a8053a -> 70d9839cf


[SPARK-7749] [SQL] Fixes partition discovery for non-partitioned tables

When no partition columns can be found, we should have an empty 
`PartitionSpec`, rather than a `PartitionSpec` with empty partition columns.

This PR together with #6285 should fix SPARK-7749.

Author: Cheng Lian <l...@databricks.com>
Author: Yin Huai <yh...@databricks.com>

Closes #6287 from liancheng/spark-7749 and squashes the following commits:

a799ff3 [Cheng Lian] Adds test cases for SPARK-7749
c4949be [Cheng Lian] Minor refactoring, and tolerant _TEMPORARY directory name
5aa87ea [Yin Huai] Make parsePartitions more robust.
fc56656 [Cheng Lian] Returns empty PartitionSpec if no partition columns can be 
inferred
19ae41e [Cheng Lian] Don't list base directory as leaf directory

(cherry picked from commit 8730fbb47b09fcf955fe16dd03b75596db6d53b6)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70d9839c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70d9839c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70d9839c

Branch: refs/heads/branch-1.4
Commit: 70d9839cf3b553573c43d884b45a8b942e2f4770
Parents: b97a805
Author: Cheng Lian <l...@databricks.com>
Authored: Thu May 21 10:56:17 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu May 21 10:56:26 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/sources/PartitioningUtils.scala   | 84 +++++++++++++-------
 .../apache/spark/sql/sources/interfaces.scala   |  7 +-
 .../ParquetPartitionDiscoverySuite.scala        | 49 ++++++++++--
 .../apache/spark/sql/hive/parquetSuites.scala   | 51 +++++++++++-
 4 files changed, 150 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/70d9839c/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index 8f8138d..e0ead23 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -33,6 +33,10 @@ private[sql] case class Partition(values: Row, path: String)
 
 private[sql] case class PartitionSpec(partitionColumns: StructType, 
partitions: Seq[Partition])
 
+private[sql] object PartitionSpec {
+  val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), 
Seq.empty[Partition])
+}
+
 private[sql] object PartitioningUtils {
   // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, 
since sql/core doesn't
   // depend on Hive.
@@ -68,20 +72,37 @@ private[sql] object PartitioningUtils {
   private[sql] def parsePartitions(
       paths: Seq[Path],
       defaultPartitionName: String): PartitionSpec = {
-    val partitionValues = resolvePartitions(paths.flatMap(parsePartition(_, 
defaultPartitionName)))
-    val fields = {
-      val (PartitionValues(columnNames, literals)) = partitionValues.head
-      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
-        StructField(name, dataType, nullable = true)
-      }
+    // First, we need to parse every partition's path and see if we can find 
partition values.
+    val pathsWithPartitionValues = paths.flatMap { path =>
+      parsePartition(path, defaultPartitionName).map(path -> _)
     }
 
-    val partitions = partitionValues.zip(paths).map {
-      case (PartitionValues(_, literals), path) =>
-        Partition(Row(literals.map(_.value): _*), path.toString)
-    }
+    if (pathsWithPartitionValues.isEmpty) {
+      // This dataset is not partitioned.
+      PartitionSpec.emptySpec
+    } else {
+      // This dataset is partitioned. We need to check whether all partitions 
have the same
+      // partition columns and resolve potential type conflicts.
+      val resolvedPartitionValues = 
resolvePartitions(pathsWithPartitionValues.map(_._2))
+
+      // Creates the StructType which represents the partition columns.
+      val fields = {
+        val PartitionValues(columnNames, literals) = 
resolvedPartitionValues.head
+        columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
+          // We always assume partition columns are nullable since we've no 
idea whether null values
+          // will be appended in the future.
+          StructField(name, dataType, nullable = true)
+        }
+      }
+
+      // Finally, we create `Partition`s based on paths and resolved partition 
values.
+      val partitions = 
resolvedPartitionValues.zip(pathsWithPartitionValues).map {
+        case (PartitionValues(_, literals), (path, _)) =>
+          Partition(Row.fromSeq(literals.map(_.value)), path.toString)
+      }
 
-    PartitionSpec(StructType(fields), partitions)
+      PartitionSpec(StructType(fields), partitions)
+    }
   }
 
   /**
@@ -111,7 +132,7 @@ private[sql] object PartitioningUtils {
     while (!finished) {
       // Sometimes (e.g., when speculative task is enabled), temporary 
directories may be left
       // uncleaned.  Here we simply ignore them.
-      if (chopped.getName == "_temporary") {
+      if (chopped.getName.toLowerCase == "_temporary") {
         return None
       }
 
@@ -121,8 +142,12 @@ private[sql] object PartitioningUtils {
       finished = maybeColumn.isEmpty || chopped.getParent == null
     }
 
-    val (columnNames, values) = columns.reverse.unzip
-    Some(PartitionValues(columnNames, values))
+    if (columns.isEmpty) {
+      None
+    } else {
+      val (columnNames, values) = columns.reverse.unzip
+      Some(PartitionValues(columnNames, values))
+    }
   }
 
   private def parsePartitionColumn(
@@ -156,20 +181,25 @@ private[sql] object PartitioningUtils {
   private[sql] def resolvePartitions(values: Seq[PartitionValues]): 
Seq[PartitionValues] = {
     // Column names of all partitions must match
     val distinctPartitionsColNames = values.map(_.columnNames).distinct
-    assert(distinctPartitionsColNames.size == 1, {
-      val list = distinctPartitionsColNames.mkString("\t", "\n", "")
-      s"Conflicting partition column names detected:\n$list"
-    })
-
-    // Resolves possible type conflicts for each column
-    val columnCount = values.head.columnNames.size
-    val resolvedValues = (0 until columnCount).map { i =>
-      resolveTypeConflicts(values.map(_.literals(i)))
-    }
 
-    // Fills resolved literals back to each partition
-    values.zipWithIndex.map { case (d, index) =>
-      d.copy(literals = resolvedValues.map(_(index)))
+    if (distinctPartitionsColNames.isEmpty) {
+      Seq.empty
+    } else {
+      assert(distinctPartitionsColNames.size == 1, {
+        val list = distinctPartitionsColNames.mkString("\t", "\n", "")
+        s"Conflicting partition column names detected:\n$list"
+      })
+
+      // Resolves possible type conflicts for each column
+      val columnCount = values.head.columnNames.size
+      val resolvedValues = (0 until columnCount).map { i =>
+        resolveTypeConflicts(values.map(_.literals(i)))
+      }
+
+      // Fills resolved literals back to each partition
+      values.zipWithIndex.map { case (d, index) =>
+        d.copy(literals = resolvedValues.map(_(index)))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70d9839c/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6a917bf..fcbac0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -462,12 +462,7 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
 
   private def discoverPartitions(): PartitionSpec = {
     val leafDirs = fileStatusCache.leafDirs.keys.toSeq
-
-    if (leafDirs.nonEmpty) {
-      PartitioningUtils.parsePartitions(leafDirs, 
PartitioningUtils.DEFAULT_PARTITION_NAME)
-    } else {
-      PartitionSpec(StructType(Array.empty[StructField]), 
Array.empty[Partition])
-    }
+    PartitioningUtils.parsePartitions(leafDirs, 
PartitioningUtils.DEFAULT_PARTITION_NAME)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/70d9839c/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 1927114..907dbb0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.sources.PartitioningUtils._
-import org.apache.spark.sql.sources.{Partition, PartitionSpec}
+import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, Row, SQLContext}
@@ -66,12 +66,6 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest {
       assert(message.contains(expected))
     }
 
-    check("file:///", Some {
-      PartitionValues(
-        ArrayBuffer.empty[String],
-        ArrayBuffer.empty[Literal])
-    })
-
     check("file://path/a=10", Some {
       PartitionValues(
         ArrayBuffer("a"),
@@ -93,6 +87,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest {
         ArrayBuffer(Literal.create(1.5, FloatType)))
     })
 
+    check("file:///", None)
+    check("file:///path/_temporary", None)
+    check("file:///path/_temporary/c=1.5", None)
+    check("file:///path/_temporary/path", None)
     check("file://path/a=10/_temporary/c=1.5", None)
     check("file://path/a=10/c=1.5/_temporary", None)
 
@@ -125,6 +123,25 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest {
           Partition(Row(10.5, "hello"), 
"hdfs://host:9000/path/a=10.5/b=hello"))))
 
     check(Seq(
+      "hdfs://host:9000/path/_temporary",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello",
+      "hdfs://host:9000/path/a=10.5/_temporary",
+      "hdfs://host:9000/path/a=10.5/_TeMpOrArY",
+      "hdfs://host:9000/path/a=10.5/b=hello/_temporary",
+      "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
+      "hdfs://host:9000/path/_temporary/path",
+      "hdfs://host:9000/path/a=11/_temporary/path",
+      "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", FloatType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row(10, "20"), "hdfs://host:9000/path/a=10/b=20"),
+          Partition(Row(10.5, "hello"), 
"hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
       s"hdfs://host:9000/path/a=10/b=20",
       s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
       PartitionSpec(
@@ -145,6 +162,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest {
         Seq(
           Partition(Row(10, null), 
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
           Partition(Row(10.5, null), 
s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path1",
+      s"hdfs://host:9000/path2"),
+      PartitionSpec.emptySpec)
   }
 
   test("read partitioned table - normal case") {
@@ -334,4 +356,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest {
       }
     }
   }
+
+  test("SPARK-7749 Non-partitioned table should have empty partition spec") {
+    withTempPath { dir =>
+      (1 to 10).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(dir.getCanonicalPath)
+      val queryExecution = read.parquet(dir.getCanonicalPath).queryExecution
+      queryExecution.analyzed.collectFirst {
+        case LogicalRelation(relation: ParquetRelation2) =>
+          assert(relation.partitionSpec === PartitionSpec.emptySpec)
+      }.getOrElse {
+        fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/70d9839c/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 05d9998..1da990b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
@@ -29,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
 import org.apache.spark.sql.sources.{InsertIntoDataSource, 
InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
+import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
 import org.apache.spark.util.Utils
 
 // The data where the partitioning key exists only in the directory structure.
@@ -385,6 +386,54 @@ class ParquetDataSourceOnMetastoreSuite extends 
ParquetMetastoreSuiteBase {
     sql("DROP TABLE ms_convert")
   }
 
+  def collectParquetRelation(df: DataFrame): ParquetRelation2 = {
+    val plan = df.queryExecution.analyzed
+    plan.collectFirst {
+      case LogicalRelation(r: ParquetRelation2) => r
+    }.getOrElse {
+      fail(s"Expecting a ParquetRelation2, but got:\n$plan")
+    }
+  }
+
+  test("SPARK-7749: non-partitioned metastore Parquet table lookup should use 
cached relation") {
+    sql(
+      s"""CREATE TABLE nonPartitioned (
+         |  key INT,
+         |  value STRING
+         |)
+         |STORED AS PARQUET
+       """.stripMargin)
+
+    // First lookup fills the cache
+    val r1 = collectParquetRelation(table("nonPartitioned"))
+    // Second lookup should reuse the cache
+    val r2 = collectParquetRelation(table("nonPartitioned"))
+    // They should be the same instance
+    assert(r1 eq r2)
+
+    sql("DROP TABLE nonPartitioned")
+  }
+
+  test("SPARK-7749: partitioned metastore Parquet table lookup should use 
cached relation") {
+    sql(
+      s"""CREATE TABLE partitioned (
+         |  key INT,
+         |  value STRING
+         |)
+         |PARTITIONED BY (part INT)
+         |STORED AS PARQUET
+       """.stripMargin)
+
+    // First lookup fills the cache
+    val r1 = collectParquetRelation(table("partitioned"))
+    // Second lookup should reuse the cache
+    val r2 = collectParquetRelation(table("partitioned"))
+    // They should be the same instance
+    assert(r1 eq r2)
+
+    sql("DROP TABLE partitioned")
+  }
+
   test("Caching converted data source Parquet Relations") {
     def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
       // Converted test_parquet should be cached.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to