Repository: spark Updated Branches: refs/heads/master 147b6be3b -> 347b50106
[SPARK-7737] [SQL] Use leaf dirs having data files to discover partitions. https://issues.apache.org/jira/browse/SPARK-7737 cc liancheng Author: Yin Huai <[email protected]> Closes #6329 from yhuai/spark-7737 and squashes the following commits: 7e0dfc7 [Yin Huai] Use leaf dirs having data files to discover partitions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/347b5010 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/347b5010 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/347b5010 Branch: refs/heads/master Commit: 347b50106bd1bcd40049f1ca29cefbb0baf53413 Parents: 147b6be Author: Yin Huai <[email protected]> Authored: Fri May 22 07:10:26 2015 +0800 Committer: Cheng Lian <[email protected]> Committed: Fri May 22 07:10:26 2015 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/sources/interfaces.scala | 7 ++----- .../sql/parquet/ParquetPartitionDiscoverySuite.scala | 10 +++++++++- 2 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/347b5010/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 61fc4e5..aaabbad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -377,8 +377,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] - var leafDirs = mutable.Map.empty[Path, FileStatus] - def refresh(): Unit = { def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = { val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir) @@ -386,7 +384,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir)) } - leafDirs.clear() leafFiles.clear() // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources @@ -399,7 +396,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } val (dirs, files) = statuses.partition(_.isDir) - leafDirs ++= dirs.map(d => d.getPath -> d).toMap leafFiles ++= files.map(f => f.getPath -> f).toMap leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent) } @@ -484,7 +480,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } private def discoverPartitions(): PartitionSpec = { - val leafDirs = fileStatusCache.leafDirs.keys.toSeq + // We use leaf dirs containing data files to discover the schema. + val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME) } http://git-wip-us.apache.org/repos/asf/spark/blob/347b5010/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 907dbb0..90d4528 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.parquet +import java.io.File + import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path @@ -175,11 +177,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { pi <- Seq(1, 2) ps <- Seq("foo", "bar") } { + val dir = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps) makeParquetFile( (1 to 10).map(i => ParquetData(i, i.toString)), - makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + dir) + // Introduce _temporary dir to test the robustness of the schema discovery process. + new File(dir.toString, "_temporary").mkdir() } + // Introduce _temporary dir to the base dir the robustness of the schema discovery process. + new File(base.getCanonicalPath, "_temporary").mkdir() + println("load the partitioned table") read.parquet(base.getCanonicalPath).registerTempTable("t") withTempTable("t") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
