Repository: spark
Updated Branches:
refs/heads/master 6dde27404 -> 76aa45d35
[SPARK-14959][SQL] handle partitioned table directories in distributed
filesystem
## What changes were proposed in this pull request?
##### The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog`
object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are
retrieved from the provided path. These FileStatus objects include directories
for the partitions (id=0 and id=2 in the jira). However, these directory
`FileStatus` objects also try to invoke `getFileBlockLocations` where directory
is not allowed for `DistributedFileSystem`, hence the exception happens.
This PR is to remove the block of code that invokes `getFileBlockLocations` for
every FileStatus object of the provided path. Instead, we call
`HadoopFsRelation.listLeafFiles` directly because this utility method filters
out the directories before calling `getFileBlockLocations` for generating
`LocatedFileStatus` objects.
## How was this patch tested?
Regtest is run. Manual test:
```
scala>
spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-----+---+
| text| id|
+-----+---+
|hello| 0|
|world| 0|
|hello| 1|
|there| 1|
+-----+---+
spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-----+---+
| text| id|
+-----+---+
|hello| 0|
|world| 0|
|hello| 1|
|there| 1|
+-----+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case in the unit test bucket that can test a
real hdfs file location. Any suggestions will be appreciated.
Author: Xin Wu <[email protected]>
Closes #13463 from xwu0226/SPARK-14959.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76aa45d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76aa45d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76aa45d3
Branch: refs/heads/master
Commit: 76aa45d359d034e9ccaac64b36738d47e1e42f2c
Parents: 6dde274
Author: Xin Wu <[email protected]>
Authored: Thu Jun 2 22:49:17 2016 -0700
Committer: Cheng Lian <[email protected]>
Committed: Thu Jun 2 22:49:17 2016 -0700
----------------------------------------------------------------------
.../datasources/ListingFileCatalog.scala | 36 ++------------------
.../datasources/fileSourceInterfaces.scala | 10 ++++++
.../datasources/FileSourceStrategySuite.scala | 1 +
3 files changed, 14 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/76aa45d3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 644e5d6..dd3c96a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -83,40 +83,10 @@ class ListingFileCatalog(
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
-
- val statuses = {
- val stats =
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
- if (pathFilter != null) stats.filter(f =>
pathFilter.accept(f.getPath)) else stats
- }
-
- statuses.map {
- case f: LocatedFileStatus => f
-
- // NOTE:
- //
- // - Although S3/S3A/S3N file system can be quite slow for remote
file metadata
- // operations, calling `getFileBlockLocations` does no harm here
since these file system
- // implementations don't actually issue RPC for this method.
- //
- // - Here we are calling `getFileBlockLocations` in a sequential
manner, but it should a
- // a big deal since we always use to `listLeafFilesInParallel`
when the number of paths
- // exceeds threshold.
- case f =>
- HadoopFsRelation.createLocatedFileStatus(f,
fs.getFileBlockLocations(f, 0, f.getLen))
- }
- }.filterNot { status =>
- val name = status.getPath.getName
- HadoopFsRelation.shouldFilterOut(name)
- }
-
- val (dirs, files) = statuses.partition(_.isDirectory)
-
- // It uses [[LinkedHashSet]] since the order of files can affect the
results. (SPARK-11500)
- if (dirs.isEmpty) {
- mutable.LinkedHashSet(files: _*)
- } else {
- mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+ Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path),
pathFilter)).
+ getOrElse(Array.empty)
}
+ mutable.LinkedHashSet(statuses: _*)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/76aa45d3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index e0e569b..7f3eed3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -381,6 +381,16 @@ private[sql] object HadoopFsRelation extends Logging {
}
statuses.filterNot(status =>
shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
+
+ // NOTE:
+ //
+ // - Although S3/S3A/S3N file system can be quite slow for remote file
metadata
+ // operations, calling `getFileBlockLocations` does no harm here
since these file system
+ // implementations don't actually issue RPC for this method.
+ //
+ // - Here we are calling `getFileBlockLocations` in a sequential
manner, but it should not
+ // be a big deal since we always use to `listLeafFilesInParallel`
when the number of
+ // paths exceeds threshold.
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0,
f.getLen))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/76aa45d3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 52dda8c..25f1443 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -490,6 +490,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
override def getFileBlockLocations(
file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+ require(!file.isDirectory, "The file path can not be a directory.")
val count = invocations.getAndAdd(1)
Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"),
0, len))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]