Repository: spark
Updated Branches:
  refs/heads/master 231f39e3f -> 4ef39c2f4


[SPARK-17974] try 2) Refactor FileCatalog classes to simplify the inheritance 
tree

## What changes were proposed in this pull request?

This renames `BasicFileCatalog => FileCatalog`, combines  `SessionFileCatalog` 
with `PartitioningAwareFileCatalog`, and removes the old `FileCatalog` trait.

In summary,
```
MetadataLogFileCatalog extends PartitioningAwareFileCatalog
ListingFileCatalog extends PartitioningAwareFileCatalog
PartitioningAwareFileCatalog extends FileCatalog
TableFileCatalog extends FileCatalog
```

(note that this is a re-submission of 
https://github.com/apache/spark/pull/15518 which got reverted)

## How was this patch tested?

Existing tests

Author: Eric Liang <e...@databricks.com>

Closes #15533 from ericl/fix-scalastyle-revert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ef39c2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ef39c2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ef39c2f

Branch: refs/heads/master
Commit: 4ef39c2f4436fa22d0b957fe7ad477e4c4a16452
Parents: 231f39e
Author: Eric Liang <e...@databricks.com>
Authored: Tue Oct 18 13:33:46 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Oct 18 13:33:46 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    |   2 +-
 .../sql/execution/DataSourceScanExec.scala      |   4 +-
 .../sql/execution/datasources/FileCatalog.scala |  66 ++++++
 .../sql/execution/datasources/FileFormat.scala  |  61 -----
 .../datasources/HadoopFsRelation.scala          |   4 +-
 .../PartitioningAwareFileCatalog.scala          | 217 +++++++++++++++++-
 .../datasources/PartitioningUtils.scala         |  12 +-
 .../datasources/SessionFileCatalog.scala        | 225 -------------------
 .../datasources/TableFileCatalog.scala          |  11 +-
 .../datasources/FileCatalogSuite.scala          |  10 +
 .../datasources/SessionFileCatalogSuite.scala   |  34 ---
 .../ParquetPartitionDiscoverySuite.scala        |  10 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   2 +-
 13 files changed, 304 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7dccbbd..073d2b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, 
QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.command.{CreateViewCommand, 
ExplainCommand, GlobalTempView, LocalTempView}
-import org.apache.spark.sql.execution.datasources.{FileCatalog, 
HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 623d2be..fdd1fa3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -431,7 +431,7 @@ case class FileSourceScanExec(
   private def createBucketedReadRDD(
       bucketSpec: BucketSpec,
       readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Seq[Partition],
+      selectedPartitions: Seq[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val bucketed =
@@ -463,7 +463,7 @@ case class FileSourceScanExec(
    */
   private def createNonBucketedReadRDD(
       readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Seq[Partition],
+      selectedPartitions: Seq[PartitionDirectory],
       fsRelation: HadoopFsRelation): RDD[InternalRow] = {
     val defaultMaxSplitBytes =
       fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
new file mode 100644
index 0000000..2bc66ce
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+
+/**
+ * A collection of data files from a partitioned relation, along with the 
partition values in the
+ * form of an [[InternalRow]].
+ */
+case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the root paths of a 
relation as well as the
+ * partitions of a relation subject to some pruning expressions.
+ */
+trait FileCatalog {
+
+  /**
+   * Returns the list of root input paths from which the catalog will get 
files. There may be a
+   * single root path from which partitions are discovered, or individual 
partitions may be
+   * specified by each path.
+   */
+  def rootPaths: Seq[Path]
+
+  /**
+   * Returns all valid files grouped into partitions when the data is 
partitioned. If the data is
+   * unpartitioned, this will return a single partition with no partition 
values.
+   *
+   * @param filters The filters used to prune which partitions are returned.  
These filters must
+   *                only refer to partition columns and this method will only 
return files
+   *                where these predicates are guaranteed to evaluate to 
`true`.  Thus, these
+   *                filters will not need to be evaluated again on the 
returned data.
+   */
+  def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+
+  /**
+   * Returns the list of files that will be read when scanning this relation. 
This call may be
+   * very expensive for large tables.
+   */
+  def inputFiles: Array[String]
+
+  /** Refresh any cached file listings */
+  def refresh(): Unit
+
+  /** Sum of table file sizes, in bytes */
+  def sizeInBytes: Long
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index e7239ef..9d153ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -175,64 +175,3 @@ abstract class TextBasedFileFormat extends FileFormat {
     codec == null || codec.isInstanceOf[SplittableCompressionCodec]
   }
 }
-
-/**
- * A collection of data files from a partitioned relation, along with the 
partition values in the
- * form of an [[InternalRow]].
- */
-case class Partition(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the root paths of a 
relation as well as the
- * partitions of a relation subject to some pruning expressions.
- */
-trait BasicFileCatalog {
-
-  /**
-   * Returns the list of root input paths from which the catalog will get 
files. There may be a
-   * single root path from which partitions are discovered, or individual 
partitions may be
-   * specified by each path.
-   */
-  def rootPaths: Seq[Path]
-
-  /**
-   * Returns all valid files grouped into partitions when the data is 
partitioned. If the data is
-   * unpartitioned, this will return a single partition with no partition 
values.
-   *
-   * @param filters The filters used to prune which partitions are returned.  
These filters must
-   *                only refer to partition columns and this method will only 
return files
-   *                where these predicates are guaranteed to evaluate to 
`true`.  Thus, these
-   *                filters will not need to be evaluated again on the 
returned data.
-   */
-  def listFiles(filters: Seq[Expression]): Seq[Partition]
-
-  /** Returns the list of files that will be read when scanning this relation. 
*/
-  def inputFiles: Array[String]
-
-  /** Refresh any cached file listings */
-  def refresh(): Unit
-
-  /** Sum of table file sizes, in bytes */
-  def sizeInBytes: Long
-}
-
-/**
- * A [[BasicFileCatalog]] which can enumerate all of the files comprising a 
relation and, from
- * those, infer the relation's partition specification.
- */
-// TODO: Consider a more descriptive, appropriate name which suggests this is 
a file catalog for
-// which it is safe to list all of its files?
-trait FileCatalog extends BasicFileCatalog {
-
-  /** Returns the specification of the partitions inferred from the data. */
-  def partitionSpec(): PartitionSpec
-
-  /** Returns all the valid files. */
-  def allFiles(): Seq[FileStatus]
-
-  /** Returns the list of files that will be read when scanning this relation. 
*/
-  override def inputFiles: Array[String] =
-    allFiles().map(_.getPath.toUri.toString).toArray
-
-  override def sizeInBytes: Long = allFiles().map(_.getLen).sum
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index db889ed..afad889 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
  * Acts as a container for all of the metadata required to read from a 
datasource. All discovery,
  * resolution and merging logic for schemas and partitions has been removed.
  *
- * @param location A [[BasicFileCatalog]] that can enumerate the locations of 
all the files that
+ * @param location A [[FileCatalog]] that can enumerate the locations of all 
the files that
  *                 comprise this relation.
  * @param partitionSchema The schema of the columns (if any) that are used to 
partition the relation
  * @param dataSchema The schema of any remaining columns.  Note that if any 
partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
  * @param options Configuration used when reading / writing data.
  */
 case class HadoopFsRelation(
-    location: BasicFileCatalog,
+    location: FileCatalog,
     partitionSchema: StructType,
     dataSchema: StructType,
     bucketSpec: Option[BucketSpec],

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index b250811..5c8eff7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -17,14 +17,21 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
 
 
 /**
@@ -38,22 +45,24 @@ import org.apache.spark.sql.types.{StringType, StructType}
 abstract class PartitioningAwareFileCatalog(
     sparkSession: SparkSession,
     parameters: Map[String, String],
-    partitionSchema: Option[StructType])
-  extends SessionFileCatalog(sparkSession) with FileCatalog {
+    partitionSchema: Option[StructType]) extends FileCatalog with Logging {
   import PartitioningAwareFileCatalog.BASE_PATH_PARAM
 
-  override protected val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+  /** Returns the specification of the partitions inferred from the data. */
+  def partitionSpec(): PartitionSpec
+
+  protected val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(parameters)
 
   protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
 
   protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
 
-  override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
-      Partition(InternalRow.empty, allFiles().filter(f => 
isDataPath(f.getPath))) :: Nil
+      PartitionDirectory(InternalRow.empty, allFiles().filter(f => 
isDataPath(f.getPath))) :: Nil
     } else {
       prunePartitions(filters, partitionSpec()).map {
-        case PartitionDirectory(values, path) =>
+        case PartitionPath(values, path) =>
           val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
             case Some(existingDir) =>
               // Directory has children files in it, return them
@@ -63,14 +72,20 @@ abstract class PartitioningAwareFileCatalog(
               // Directory does not exist, or has no children files
               Nil
           }
-          Partition(values, files)
+          PartitionDirectory(values, files)
       }
     }
     logTrace("Selected files after partition pruning:\n\t" + 
selectedPartitions.mkString("\n\t"))
     selectedPartitions
   }
 
-  override def allFiles(): Seq[FileStatus] = {
+  /** Returns the list of files that will be read when scanning this relation. 
*/
+  override def inputFiles: Array[String] =
+    allFiles().map(_.getPath.toUri.toString).toArray
+
+  override def sizeInBytes: Long = allFiles().map(_.getLen).sum
+
+  def allFiles(): Seq[FileStatus] = {
     if (partitionSpec().partitionColumns.isEmpty) {
       // For each of the root input paths, get the list of files inside them
       rootPaths.flatMap { path =>
@@ -139,7 +154,7 @@ abstract class PartitioningAwareFileCatalog(
 
   private def prunePartitions(
       predicates: Seq[Expression],
-      partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
+      partitionSpec: PartitionSpec): Seq[PartitionPath] = {
     val PartitionSpec(partitionColumns, partitions) = partitionSpec
     val partitionColumnNames = partitionColumns.map(_.name).toSet
     val partitionPruningPredicates = predicates.filter {
@@ -156,7 +171,7 @@ abstract class PartitioningAwareFileCatalog(
       })
 
       val selected = partitions.filter {
-        case PartitionDirectory(values, _) => boundPredicate(values)
+        case PartitionPath(values, _) => boundPredicate(values)
       }
       logInfo {
         val total = partitions.length
@@ -214,8 +229,186 @@ abstract class PartitioningAwareFileCatalog(
     val name = path.getName
     !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
   }
+
+  /**
+   * List leaf files of given paths. This method will submit a Spark job to do 
parallel
+   * listing whenever there is a path having more files than the parallel 
partition discovery
+   * discovery threshold.
+   *
+   * This is publicly visible for testing.
+   */
+  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+    val files =
+      if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+        PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, 
hadoopConf, sparkSession)
+      } else {
+        PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+      }
+
+    HiveCatalogMetrics.incrementFilesDiscovered(files.size)
+    mutable.LinkedHashSet(files: _*)
+  }
 }
 
-object PartitioningAwareFileCatalog {
+object PartitioningAwareFileCatalog extends Logging {
   val BASE_PATH_PARAM = "basePath"
+
+  /** A serializable variant of HDFS's BlockLocation. */
+  private case class SerializableBlockLocation(
+      names: Array[String],
+      hosts: Array[String],
+      offset: Long,
+      length: Long)
+
+  /** A serializable variant of HDFS's FileStatus. */
+  private case class SerializableFileStatus(
+      path: String,
+      length: Long,
+      isDir: Boolean,
+      blockReplication: Short,
+      blockSize: Long,
+      modificationTime: Long,
+      accessTime: Long,
+      blockLocations: Array[SerializableBlockLocation])
+
+  /**
+   * List a collection of path recursively.
+   */
+  private def listLeafFilesInSerial(
+      paths: Seq[Path],
+      hadoopConf: Configuration): Seq[FileStatus] = {
+    // Dummy jobconf to get to the pathFilter defined in configuration
+    val jobConf = new JobConf(hadoopConf, this.getClass)
+    val filter = FileInputFormat.getInputPathFilter(jobConf)
+
+    paths.flatMap { path =>
+      val fs = path.getFileSystem(hadoopConf)
+      listLeafFiles0(fs, path, filter)
+    }
+  }
+
+  /**
+   * List a collection of path recursively in parallel (using Spark executors).
+   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   */
+  private def listLeafFilesInParallel(
+      paths: Seq[Path],
+      hadoopConf: Configuration,
+      sparkSession: SparkSession): Seq[FileStatus] = {
+    assert(paths.size >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+    logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
+
+    val sparkContext = sparkSession.sparkContext
+    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializedPaths = paths.map(_.toString)
+
+    // Set the number of parallelism to prevent following file listing from 
generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val statuses = sparkContext
+      .parallelize(serializedPaths, numParallelism)
+      .mapPartitions { paths =>
+        val hadoopConf = serializableConfiguration.value
+        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, 
hadoopConf).iterator
+      }.map { status =>
+        // Turn FileStatus into SerializableFileStatus so we can send it back 
to the driver
+        val blockLocations = status match {
+          case f: LocatedFileStatus =>
+            f.getBlockLocations.map { loc =>
+              SerializableBlockLocation(
+                loc.getNames,
+                loc.getHosts,
+                loc.getOffset,
+                loc.getLength)
+            }
+
+          case _ =>
+            Array.empty[SerializableBlockLocation]
+        }
+
+        SerializableFileStatus(
+          status.getPath.toString,
+          status.getLen,
+          status.isDirectory,
+          status.getReplication,
+          status.getBlockSize,
+          status.getModificationTime,
+          status.getAccessTime,
+          blockLocations)
+      }.collect()
+
+    // Turn SerializableFileStatus back to Status
+    statuses.map { f =>
+      val blockLocations = f.blockLocations.map { loc =>
+        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+      }
+      new LocatedFileStatus(
+        new FileStatus(
+          f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime, new Path(f.path)),
+        blockLocations)
+    }
+  }
+
+  /**
+   * List a single path, provided as a FileStatus, in serial.
+   */
+  private def listLeafFiles0(
+      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+    logTrace(s"Listing $path")
+    val name = path.getName.toLowerCase
+    if (shouldFilterOut(name)) {
+      Seq.empty[FileStatus]
+    } else {
+      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't 
exist
+      // Note that statuses only include FileStatus for the files and dirs 
directly under path,
+      // and does not include anything else recursively.
+      val statuses = try fs.listStatus(path) catch {
+        case _: FileNotFoundException =>
+          logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
+          Array.empty[FileStatus]
+      }
+
+      val allLeafStatuses = {
+        val (dirs, files) = statuses.partition(_.isDirectory)
+        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, 
dir.getPath, filter))
+        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else 
stats
+      }
+
+      allLeafStatuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
+        case f: LocatedFileStatus =>
+          f
+
+        // NOTE:
+        //
+        // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
+        //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
+        //   implementations don't actually issue RPC for this method.
+        //
+        // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
+        //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
+        //   paths exceeds threshold.
+        case f =>
+          // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
+          // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
+          // subprocess and parse the stdout).
+          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+            f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+          if (f.isSymlink) {
+            lfs.setSymlink(f.getSymlink)
+          }
+          lfs
+      }
+    }
+  }
+
+  /** Checks if we should filter out this path name. */
+  def shouldFilterOut(pathName: String): Boolean = {
+    // We filter everything that starts with _ and ., except _common_metadata 
and _metadata
+    // because Parquet needs to find those metadata files from leaf files 
returned by this method.
+    // We should refactor this logic to not mix metadata files with data files.
+    ((pathName.startsWith("_") && !pathName.contains("=")) || 
pathName.startsWith(".")) &&
+      !pathName.startsWith("_common_metadata") && 
!pathName.startsWith("_metadata")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 381261c..81bdabb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -34,8 +34,8 @@ import org.apache.spark.sql.types._
 
 // TODO: We should tighten up visibility of the classes here once we clean up 
Hive coupling.
 
-object PartitionDirectory {
-  def apply(values: InternalRow, path: String): PartitionDirectory =
+object PartitionPath {
+  def apply(values: InternalRow, path: String): PartitionPath =
     apply(values, new Path(path))
 }
 
@@ -43,14 +43,14 @@ object PartitionDirectory {
  * Holds a directory in a partitioned collection of files as well as as the 
partition values
  * in the form of a Row.  Before scanning, the files at `path` need to be 
enumerated.
  */
-case class PartitionDirectory(values: InternalRow, path: Path)
+case class PartitionPath(values: InternalRow, path: Path)
 
 case class PartitionSpec(
     partitionColumns: StructType,
-    partitions: Seq[PartitionDirectory])
+    partitions: Seq[PartitionPath])
 
 object PartitionSpec {
-  val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), 
Seq.empty[PartitionDirectory])
+  val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), 
Seq.empty[PartitionPath])
 }
 
 object PartitioningUtils {
@@ -142,7 +142,7 @@ object PartitioningUtils {
       // Finally, we create `Partition`s based on paths and resolved partition 
values.
       val partitions = 
resolvedPartitionValues.zip(pathsWithPartitionValues).map {
         case (PartitionValues(_, literals), (path, _)) =>
-          PartitionDirectory(InternalRow.fromSeq(literals.map(_.value)), path)
+          PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path)
       }
 
       PartitionSpec(StructType(fields), partitions)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
deleted file mode 100644
index 4807a92..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.FileNotFoundException
-
-import scala.collection.mutable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.SerializableConfiguration
-
-
-/**
- * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the 
ability to find leaf
- * files in a list of HDFS paths.
- *
- * @param sparkSession a [[SparkSession]]
- * @param ignoreFileNotFound (see [[ListingFileCatalog]])
- */
-abstract class SessionFileCatalog(sparkSession: SparkSession)
-    extends BasicFileCatalog with Logging {
-  protected val hadoopConf: Configuration
-
-  /**
-   * List leaf files of given paths. This method will submit a Spark job to do 
parallel
-   * listing whenever there is a path having more files than the parallel 
partition discovery
-   * discovery threshold.
-   *
-   * This is publicly visible for testing.
-   */
-  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    val files =
-      if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-        SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, 
sparkSession)
-      } else {
-        SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
-      }
-
-    HiveCatalogMetrics.incrementFilesDiscovered(files.size)
-    mutable.LinkedHashSet(files: _*)
-  }
-}
-
-object SessionFileCatalog extends Logging {
-
-  /** A serializable variant of HDFS's BlockLocation. */
-  private case class SerializableBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  /** A serializable variant of HDFS's FileStatus. */
-  private case class SerializableFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[SerializableBlockLocation])
-
-  /**
-   * List a collection of path recursively.
-   */
-  private def listLeafFilesInSerial(
-      paths: Seq[Path],
-      hadoopConf: Configuration): Seq[FileStatus] = {
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    val jobConf = new JobConf(hadoopConf, this.getClass)
-    val filter = FileInputFormat.getInputPathFilter(jobConf)
-
-    paths.flatMap { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      listLeafFiles0(fs, path, filter)
-    }
-  }
-
-  /**
-   * List a collection of path recursively in parallel (using Spark executors).
-   * Each task launched will use [[listLeafFilesInSerial]] to list.
-   */
-  private def listLeafFilesInParallel(
-      paths: Seq[Path],
-      hadoopConf: Configuration,
-      sparkSession: SparkSession): Seq[FileStatus] = {
-    assert(paths.size >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
-    logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
-
-    val sparkContext = sparkSession.sparkContext
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val serializedPaths = paths.map(_.toString)
-
-    // Set the number of parallelism to prevent following file listing from 
generating many tasks
-    // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
-
-    val statuses = sparkContext
-      .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { paths =>
-        val hadoopConf = serializableConfiguration.value
-        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, 
hadoopConf).iterator
-      }.map { status =>
-        // Turn FileStatus into SerializableFileStatus so we can send it back 
to the driver
-        val blockLocations = status match {
-          case f: LocatedFileStatus =>
-            f.getBlockLocations.map { loc =>
-              SerializableBlockLocation(
-                loc.getNames,
-                loc.getHosts,
-                loc.getOffset,
-                loc.getLength)
-            }
-
-          case _ =>
-            Array.empty[SerializableBlockLocation]
-        }
-
-        SerializableFileStatus(
-          status.getPath.toString,
-          status.getLen,
-          status.isDirectory,
-          status.getReplication,
-          status.getBlockSize,
-          status.getModificationTime,
-          status.getAccessTime,
-          blockLocations)
-      }.collect()
-
-    // Turn SerializableFileStatus back to Status
-    statuses.map { f =>
-      val blockLocations = f.blockLocations.map { loc =>
-        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-      }
-      new LocatedFileStatus(
-        new FileStatus(
-          f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime, new Path(f.path)),
-        blockLocations)
-    }
-  }
-
-  /**
-   * List a single path, provided as a FileStatus, in serial.
-   */
-  private def listLeafFiles0(
-      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
-    logTrace(s"Listing $path")
-    val name = path.getName.toLowerCase
-    if (shouldFilterOut(name)) {
-      Seq.empty[FileStatus]
-    } else {
-      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't 
exist
-      // Note that statuses only include FileStatus for the files and dirs 
directly under path,
-      // and does not include anything else recursively.
-      val statuses = try fs.listStatus(path) catch {
-        case _: FileNotFoundException =>
-          logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
-          Array.empty[FileStatus]
-      }
-
-      val allLeafStatuses = {
-        val (dirs, files) = statuses.partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, 
dir.getPath, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else 
stats
-      }
-
-      allLeafStatuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
-        case f: LocatedFileStatus =>
-          f
-
-        // NOTE:
-        //
-        // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
-        //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-        //   implementations don't actually issue RPC for this method.
-        //
-        // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
-        //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
-        //   paths exceeds threshold.
-        case f =>
-          // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
-          // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
-          // subprocess and parse the stdout).
-          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-            f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-          if (f.isSymlink) {
-            lfs.setSymlink(f.getSymlink)
-          }
-          lfs
-      }
-    }
-  }
-
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter everything that starts with _ and ., except _common_metadata 
and _metadata
-    // because Parquet needs to find those metadata files from leaf files 
returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    ((pathName.startsWith("_") && !pathName.contains("=")) || 
pathName.startsWith(".")) &&
-      !pathName.startsWith("_common_metadata") && 
!pathName.startsWith("_metadata")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index a5c41b2..5648ab4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType
 
 
 /**
- * A [[BasicFileCatalog]] for a metastore catalog table.
+ * A [[FileCatalog]] for a metastore catalog table.
  *
  * @param sparkSession a [[SparkSession]]
  * @param db the table's database name
@@ -38,10 +38,9 @@ class TableFileCatalog(
     db: String,
     table: String,
     partitionSchema: Option[StructType],
-    override val sizeInBytes: Long)
-  extends SessionFileCatalog(sparkSession) {
+    override val sizeInBytes: Long) extends FileCatalog {
 
-  override protected val hadoopConf = sparkSession.sessionState.newHadoopConf
+  protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
   private val externalCatalog = sparkSession.sharedState.externalCatalog
 
@@ -51,7 +50,7 @@ class TableFileCatalog(
 
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
-  override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
     filterPartitions(filters).listFiles(Nil)
   }
 
@@ -79,7 +78,7 @@ class TableFileCatalog(
       case Some(schema) =>
         val selectedPartitions = externalCatalog.listPartitionsByFilter(db, 
table, filters)
         val partitions = selectedPartitions.map { p =>
-          PartitionDirectory(p.toRow(schema), p.storage.locationUri.get)
+          PartitionPath(p.toRow(schema), p.storage.locationUri.get)
         }
         val partitionSpec = PartitionSpec(schema, partitions)
         new PrunedTableFileCatalog(

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 2695974..9c43169 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -81,6 +81,16 @@ class FileCatalogSuite extends SharedSQLContext {
     }
   }
 
+  test("PartitioningAwareFileCatalog - file filtering") {
+    assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
+    assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
+    assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
+    assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
+    assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
+    assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
+    assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
+  }
+
   test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") 
{
     class MockCatalog(
       override val rootPaths: Seq[Path])

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
deleted file mode 100644
index df50958..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.SparkFunSuite
-
-class SessionFileCatalogSuite extends SparkFunSuite {
-
-  test("file filtering") {
-    assert(!SessionFileCatalog.shouldFilterOut("abcd"))
-    assert(SessionFileCatalog.shouldFilterOut(".ab"))
-    assert(SessionFileCatalog.shouldFilterOut("_cd"))
-
-    assert(!SessionFileCatalog.shouldFilterOut("_metadata"))
-    assert(!SessionFileCatalog.shouldFilterOut("_common_metadata"))
-    assert(SessionFileCatalog.shouldFilterOut("_ab_metadata"))
-    assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 2ef66ba..f2a209e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -30,7 +30,8 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.execution.datasources.{FileCatalog, 
HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, 
PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -632,10 +633,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       (1 to 10).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(dir.getCanonicalPath)
       val queryExecution = 
spark.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
-        case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, 
_, _), _, _) =>
-          assert(location.partitionSpec === PartitionSpec.emptySpec)
+        case LogicalRelation(
+            HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, 
_, _), _, _) =>
+          assert(location.partitionSpec() === PartitionSpec.emptySpec)
       }.getOrElse {
-        fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")
+        fail(s"Expecting a matching HadoopFsRelation, but 
got:\n$queryExecution")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ef39c2f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 4a2aaa7..16e1e37 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{Partition => _, _}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.types._


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to