Repository: spark
Updated Branches:
  refs/heads/master 4186aba63 -> 76622c661


[SPARK-16975][SQL][FOLLOWUP] Do not duplicately check file paths in data 
sources implementing FileFormat

## What changes were proposed in this pull request?

This PR cleans up duplicated checking for file paths in implemented data 
sources and prevent to attempt to list twice in ORC data source.

https://github.com/apache/spark/pull/14585 handles a problem for the partition 
column name having `_` and the issue itself is resolved correctly. However, it 
seems the data sources implementing `FileFormat` are validating the paths 
duplicately. Assuming from the comment in `CSVFileFormat`, `// TODO: Move 
filtering.`, I guess we don't have to check this duplicately.

   Currently, this seems being filtered in 
`PartitioningAwareFileIndex.shouldFilterOut` 
and`PartitioningAwareFileIndex.isDataPath`. So, `FileFormat.inferSchema` will 
always receive leaf files. For example, running to codes below:

   ``` scala
   spark.range(10).withColumn("_locality_code", 
$"id").write.partitionBy("_locality_code").save("/tmp/parquet")
   spark.read.parquet("/tmp/parquet")
   ```

   gives the paths below without directories but just valid data files:

   ``` bash
   
/tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
   
/tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet
   
/tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet
   ...
   ```

   to `FileFormat.inferSchema`.

## How was this patch tested?

Unit test added in `HadoopFsRelationTest` and related existing tests.

Author: hyukjinkwon <[email protected]>

Closes #14627 from HyukjinKwon/SPARK-16975.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76622c66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76622c66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76622c66

Branch: refs/heads/master
Commit: 76622c661fcae81eb0352c61f54a2e9e21a4fb98
Parents: 4186aba
Author: hyukjinkwon <[email protected]>
Authored: Thu Dec 22 10:00:20 2016 -0800
Committer: Reynold Xin <[email protected]>
Committed: Thu Dec 22 10:00:20 2016 -0800

----------------------------------------------------------------------
 .../execution/datasources/csv/CSVFileFormat.scala  |  5 ++---
 .../datasources/json/JsonFileFormat.scala          |  7 +------
 .../datasources/parquet/ParquetFileFormat.scala    |  7 +------
 .../spark/sql/sources/HadoopFsRelationTest.scala   | 17 +++++++++++++++++
 4 files changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index e627f04..b0feaeb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -55,10 +55,9 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     require(files.nonEmpty, "Cannot infer schema from an empty set of files")
-    val csvOptions = new CSVOptions(options)
 
-    // TODO: Move filtering.
-    val paths = files.filterNot(_.getPath.getName startsWith 
"_").map(_.getPath.toString)
+    val csvOptions = new CSVOptions(options)
+    val paths = files.map(_.getPath.toString)
     val lines: Dataset[String] = readText(sparkSession, csvOptions, paths)
     val firstLine: String = findFirstLine(csvOptions, lines)
     val firstRow = new CsvReader(csvOptions).parseLine(firstLine)

http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index c957914..a9d8ddf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -51,13 +51,8 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       val columnNameOfCorruptRecord =
         parsedOptions.columnNameOfCorruptRecord
           .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-      val jsonFiles = files.filterNot { status =>
-        val name = status.getPath.getName
-        (name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
-      }.toArray
-
       val jsonSchema = InferSchema.infer(
-        createBaseRdd(sparkSession, jsonFiles),
+        createBaseRdd(sparkSession, files),
         columnNameOfCorruptRecord,
         parsedOptions)
       checkConstraints(jsonSchema)

http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 0965ffe..0efe6da 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -241,12 +241,7 @@ class ParquetFileFormat
       commonMetadata: Seq[FileStatus])
 
   private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = {
-    // Lists `FileStatus`es of all leaf nodes (files) under all base 
directories.
-    val leaves = allFiles.filter { f =>
-      isSummaryFile(f.getPath) ||
-        !((f.getPath.getName.startsWith("_") && 
!f.getPath.getName.contains("=")) ||
-          f.getPath.getName.startsWith("."))
-    }.toArray.sortBy(_.getPath.toString)
+    val leaves = allFiles.toArray.sortBy(_.getPath.toString)
 
     FileTypes(
       data = leaves.filterNot(f => isSummaryFile(f.getPath)),

http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 224b2c6..06566a9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -877,6 +877,23 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils with Tes
       }
     }
   }
+
+  test("SPARK-16975: Partitioned table with the column having '_' should be 
read correctly") {
+    withTempDir { dir =>
+      val childDir = new File(dir, dataSourceName).getCanonicalPath
+      val dataDf = spark.range(10).toDF()
+      val df = dataDf.withColumn("_col", $"id")
+      df.write.format(dataSourceName).partitionBy("_col").save(childDir)
+      val reader = spark.read.format(dataSourceName)
+
+      // This is needed for SimpleTextHadoopFsRelationSuite as 
SimpleTextSource needs schema.
+      if (dataSourceName == classOf[SimpleTextSource].getCanonicalName) {
+        reader.option("dataSchema", dataDf.schema.json)
+      }
+      val readBack = reader.load(childDir)
+      checkAnswer(df, readBack)
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output 
committer when


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to