Repository: spark
Updated Branches:
refs/heads/branch-2.0 d90359d63 -> 689b0fc81
[SPARK-14993][SQL] Fix Partition Discovery Inconsistency when Input is a Path
to Parquet File
#### What changes were proposed in this pull request?
When we load a dataset, if we set the path to ```/path/a=1```, we will not take
`a` as the partitioning column. However, if we set the path to
```/path/a=1/file.parquet```, we take `a` as the partitioning column and it
shows up in the schema.
This PR is to fix the behavior inconsistency issue.
The base path contains a set of paths that are considered as the base dirs of
the input datasets. The partitioning discovery logic will make sure it will
stop when it reaches any base path.
By default, the paths of the dataset provided by users will be base paths.
Below are three typical cases,
**Case 1**```sqlContext.read.parquet("/path/something=true/")```: the base path
will be
`/path/something=true/`, and the returned DataFrame will not contain a column
of `something`.
**Case 2**```sqlContext.read.parquet("/path/something=true/a.parquet")```: the
base path will be
still `/path/something=true/`, and the returned DataFrame will also not contain
a column of
`something`.
**Case 3**```sqlContext.read.parquet("/path/")```: the base path will be
`/path/`, and the returned
DataFrame will have the column of `something`.
Users also can override the basePath by setting `basePath` in the options to
pass the new base
path to the data source. For example,
```sqlContext.read.option("basePath",
"/path/").parquet("/path/something=true/")```,
and the returned DataFrame will have the column of `something`.
The related PRs:
- https://github.com/apache/spark/pull/9651
- https://github.com/apache/spark/pull/10211
#### How was this patch tested?
Added a couple of test cases
Author: gatorsmile <[email protected]>
Author: xiaoli <[email protected]>
Author: Xiao Li <[email protected]>
Closes #12828 from gatorsmile/readPartitionedTable.
(cherry picked from commit ef55e46c9225ddceebeaf19398519cbe651c1728)
Signed-off-by: Yin Huai <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/689b0fc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/689b0fc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/689b0fc8
Branch: refs/heads/branch-2.0
Commit: 689b0fc819a16d6013bd4bfec2063354841a916e
Parents: d90359d
Author: gatorsmile <[email protected]>
Authored: Wed May 4 18:47:27 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Wed May 4 18:47:37 2016 -0700
----------------------------------------------------------------------
.../PartitioningAwareFileCatalog.scala | 42 +++++++++-----
.../ParquetPartitionDiscoverySuite.scala | 60 ++++++++++++++++++++
2 files changed, 88 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/689b0fc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9d997d6..2c44b39 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -133,23 +133,37 @@ abstract class PartitioningAwareFileCatalog(
/**
* Contains a set of paths that are considered as the base dirs of the input
datasets.
* The partitioning discovery logic will make sure it will stop when it
reaches any
- * base path. By default, the paths of the dataset provided by users will be
base paths.
- * For example, if a user uses
`sqlContext.read.parquet("/path/something=true/")`, the base path
- * will be `/path/something=true/`, and the returned DataFrame will not
contain a column of
- * `something`. If users want to override the basePath. They can set
`basePath` in the options
- * to pass the new base path to the data source.
- * For the above example, if the user-provided base path is `/path/`, the
returned
+ * base path.
+ *
+ * By default, the paths of the dataset provided by users will be base paths.
+ * Below are three typical examples,
+ * Case 1) `sqlContext.read.parquet("/path/something=true/")`: the base path
will be
+ * `/path/something=true/`, and the returned DataFrame will not contain a
column of `something`.
+ * Case 2) `sqlContext.read.parquet("/path/something=true/a.parquet")`: the
base path will be
+ * still `/path/something=true/`, and the returned DataFrame will also not
contain a column of
+ * `something`.
+ * Case 3) `sqlContext.read.parquet("/path/")`: the base path will be
`/path/`, and the returned
* DataFrame will have the column of `something`.
+ *
+ * Users also can override the basePath by setting `basePath` in the options
to pass the new base
+ * path to the data source.
+ * For example, `sqlContext.read.option("basePath",
"/path/").parquet("/path/something=true/")`,
+ * and the returned DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
- val userDefinedBasePath = parameters.get("basePath").map(basePath =>
Set(new Path(basePath)))
- userDefinedBasePath.getOrElse {
- // If the user does not provide basePath, we will just use paths.
- paths.toSet
- }.map { hdfsPath =>
- // Make the path qualified (consistent with listLeafFiles and
listLeafFilesInParallel).
- val fs = hdfsPath.getFileSystem(hadoopConf)
- hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ parameters.get("basePath").map(new Path(_)) match {
+ case Some(userDefinedBasePath) =>
+ val fs = userDefinedBasePath.getFileSystem(hadoopConf)
+ if (!fs.isDirectory(userDefinedBasePath)) {
+ throw new IllegalArgumentException("Option 'basePath' must be a
directory")
+ }
+ Set(fs.makeQualified(userDefinedBasePath))
+
+ case None =>
+ paths.map { path =>
+ // Make the path qualified (consistent with listLeafFiles and
listLeafFilesInParallel).
+ val qualifiedPath =
path.getFileSystem(hadoopConf).makeQualified(path)
+ if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else
qualifiedPath }.toSet
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/689b0fc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 5bffb30..cb2c252 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -191,6 +191,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest
with ParquetTest with Sha
checkThrows[AssertionError]("file://path/a=", "Empty partition column
value")
}
+ test("parse partition with base paths") {
+ // when the basePaths is the same as the path to a leaf directory
+ val partitionSpec1: Option[PartitionValues] = parsePartition(
+ path = new Path("file://path/a=10"),
+ defaultPartitionName = defaultPartitionName,
+ typeInference = true,
+ basePaths = Set(new Path("file://path/a=10")))._1
+
+ assert(partitionSpec1.isEmpty)
+
+ // when the basePaths is the path to a base directory of leaf directories
+ val partitionSpec2: Option[PartitionValues] = parsePartition(
+ path = new Path("file://path/a=10"),
+ defaultPartitionName = defaultPartitionName,
+ typeInference = true,
+ basePaths = Set(new Path("file://path")))._1
+
+ assert(partitionSpec2 ==
+ Option(PartitionValues(
+ ArrayBuffer("a"),
+ ArrayBuffer(Literal.create(10, IntegerType)))))
+ }
+
test("parse partitions") {
def check(
paths: Seq[String],
@@ -413,6 +436,43 @@ class ParquetPartitionDiscoverySuite extends QueryTest
with ParquetTest with Sha
}
}
+ test("read partitioned table using different path options") {
+ withTempDir { base =>
+ val pi = 1
+ val ps = "foo"
+ val path = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps"
-> ps)
+ makeParquetFile(
+ (1 to 10).map(i => ParquetData(i, i.toString)), path)
+
+ // when the input is the base path containing partitioning directories
+ val baseDf = sqlContext.read.parquet(base.getCanonicalPath)
+ assert(baseDf.schema.map(_.name) === Seq("intField", "stringField",
"pi", "ps"))
+
+ // when the input is a path to the leaf directory containing a parquet
file
+ val partDf = sqlContext.read.parquet(path.getCanonicalPath)
+ assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))
+
+ path.listFiles().foreach { f =>
+ if (f.getName.toLowerCase().endsWith(".parquet")) {
+ // when the input is a path to a parquet file
+ val df = sqlContext.read.parquet(f.getCanonicalPath)
+ assert(df.schema.map(_.name) === Seq("intField", "stringField"))
+ }
+ }
+
+ path.listFiles().foreach { f =>
+ if (f.getName.toLowerCase().endsWith(".parquet")) {
+ // when the input is a path to a parquet file but `basePath` is
overridden to
+ // the base path containing partitioning directories
+ val df = sqlContext
+ .read.option("basePath", base.getCanonicalPath)
+ .parquet(f.getCanonicalPath)
+ assert(df.schema.map(_.name) === Seq("intField", "stringField",
"pi", "ps"))
+ }
+ }
+ }
+ }
+
test("read partitioned table - partition key included in Parquet file") {
withTempDir { base =>
for {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]