Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d5bd64a20 -> 1e13d09c5


[SPARK-14959][SQL] handle partitioned table directories in distributed 
filesystem

## What changes were proposed in this pull request?
##### The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` 
object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are 
retrieved from the provided path. These FileStatus objects include directories 
for the partitions (id=0 and id=2 in the jira). However, these directory 
`FileStatus` objects also try to invoke `getFileBlockLocations` where directory 
is not allowed for `DistributedFileSystem`, hence the exception happens.

This PR is to remove the block of code that invokes `getFileBlockLocations` for 
every FileStatus object of the provided path. Instead, we call 
`HadoopFsRelation.listLeafFiles` directly because this utility method filters 
out the directories before calling `getFileBlockLocations` for generating 
`LocatedFileStatus` objects.

## How was this patch tested?
Regtest is run. Manual test:
```
scala> 
spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-----+---+
| text| id|
+-----+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-----+---+

       
spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-----+---+
| text| id|
+-----+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-----+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case in the unit test bucket that can test a 
real hdfs file location. Any suggestions will be appreciated.

Author: Xin Wu <xi...@us.ibm.com>

Closes #13463 from xwu0226/SPARK-14959.

(cherry picked from commit 76aa45d359d034e9ccaac64b36738d47e1e42f2c)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e13d09c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e13d09c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e13d09c

Branch: refs/heads/branch-2.0
Commit: 1e13d09c526ec37c344950b07d938751bbd6fd0a
Parents: d5bd64a
Author: Xin Wu <xi...@us.ibm.com>
Authored: Thu Jun 2 22:49:17 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jun 2 22:49:24 2016 -0700

----------------------------------------------------------------------
 .../datasources/ListingFileCatalog.scala        | 36 ++------------------
 .../datasources/fileSourceInterfaces.scala      | 10 ++++++
 .../datasources/FileSourceStrategySuite.scala   |  1 +
 3 files changed, 14 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e13d09c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 644e5d6..dd3c96a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -83,40 +83,10 @@ class ListingFileCatalog(
       val statuses: Seq[FileStatus] = paths.flatMap { path =>
         val fs = path.getFileSystem(hadoopConf)
         logInfo(s"Listing $path on driver")
-
-        val statuses = {
-          val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-          if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-        }
-
-        statuses.map {
-          case f: LocatedFileStatus => f
-
-          // NOTE:
-          //
-          // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
-          //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-          //   implementations don't actually issue RPC for this method.
-          //
-          // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-          //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-          //   exceeds threshold.
-          case f =>
-            HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-        }
-      }.filterNot { status =>
-        val name = status.getPath.getName
-        HadoopFsRelation.shouldFilterOut(name)
-      }
-
-      val (dirs, files) = statuses.partition(_.isDirectory)
-
-      // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-      if (dirs.isEmpty) {
-        mutable.LinkedHashSet(files: _*)
-      } else {
-        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+        Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
+          getOrElse(Array.empty)
       }
+      mutable.LinkedHashSet(statuses: _*)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1e13d09c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index e0e569b..7f3eed3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -381,6 +381,16 @@ private[sql] object HadoopFsRelation extends Logging {
       }
       statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
         case f: LocatedFileStatus => f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
+        //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
+        //   paths exceeds threshold.
         case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, 
f.getLen))
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1e13d09c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 52dda8c..25f1443 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -490,6 +490,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
 
   override def getFileBlockLocations(
       file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+    require(!file.isDirectory, "The file path can not be a directory.")
     val count = invocations.getAndAdd(1)
     Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 
0, len))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to