Repository: spark Updated Branches: refs/heads/master 4186aba63 -> 76622c661
[SPARK-16975][SQL][FOLLOWUP] Do not duplicately check file paths in data sources implementing FileFormat ## What changes were proposed in this pull request? This PR cleans up duplicated checking for file paths in implemented data sources and prevent to attempt to list twice in ORC data source. https://github.com/apache/spark/pull/14585 handles a problem for the partition column name having `_` and the issue itself is resolved correctly. However, it seems the data sources implementing `FileFormat` are validating the paths duplicately. Assuming from the comment in `CSVFileFormat`, `// TODO: Move filtering.`, I guess we don't have to check this duplicately. Currently, this seems being filtered in `PartitioningAwareFileIndex.shouldFilterOut` and`PartitioningAwareFileIndex.isDataPath`. So, `FileFormat.inferSchema` will always receive leaf files. For example, running to codes below: ``` scala spark.range(10).withColumn("_locality_code", $"id").write.partitionBy("_locality_code").save("/tmp/parquet") spark.read.parquet("/tmp/parquet") ``` gives the paths below without directories but just valid data files: ``` bash /tmp/parquet/_col=0/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet /tmp/parquet/_col=1/part-r-00000-094a8efa-bece-4b50-b54c-7918d1f7b3f8.snappy.parquet /tmp/parquet/_col=2/part-r-00000-25de2b50-225a-4bcf-a2bc-9eb9ed407ef6.snappy.parquet ... ``` to `FileFormat.inferSchema`. ## How was this patch tested? Unit test added in `HadoopFsRelationTest` and related existing tests. Author: hyukjinkwon <[email protected]> Closes #14627 from HyukjinKwon/SPARK-16975. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76622c66 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76622c66 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76622c66 Branch: refs/heads/master Commit: 76622c661fcae81eb0352c61f54a2e9e21a4fb98 Parents: 4186aba Author: hyukjinkwon <[email protected]> Authored: Thu Dec 22 10:00:20 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Thu Dec 22 10:00:20 2016 -0800 ---------------------------------------------------------------------- .../execution/datasources/csv/CSVFileFormat.scala | 5 ++--- .../datasources/json/JsonFileFormat.scala | 7 +------ .../datasources/parquet/ParquetFileFormat.scala | 7 +------ .../spark/sql/sources/HadoopFsRelationTest.scala | 17 +++++++++++++++++ 4 files changed, 21 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index e627f04..b0feaeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -55,10 +55,9 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") - val csvOptions = new CSVOptions(options) - // TODO: Move filtering. - val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) + val csvOptions = new CSVOptions(options) + val paths = files.map(_.getPath.toString) val lines: Dataset[String] = readText(sparkSession, csvOptions, paths) val firstLine: String = findFirstLine(csvOptions, lines) val firstRow = new CsvReader(csvOptions).parseLine(firstLine) http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index c957914..a9d8ddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -51,13 +51,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) - val jsonFiles = files.filterNot { status => - val name = status.getPath.getName - (name.startsWith("_") && !name.contains("=")) || name.startsWith(".") - }.toArray - val jsonSchema = InferSchema.infer( - createBaseRdd(sparkSession, jsonFiles), + createBaseRdd(sparkSession, files), columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 0965ffe..0efe6da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -241,12 +241,7 @@ class ParquetFileFormat commonMetadata: Seq[FileStatus]) private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = { - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = allFiles.filter { f => - isSummaryFile(f.getPath) || - !((f.getPath.getName.startsWith("_") && !f.getPath.getName.contains("=")) || - f.getPath.getName.startsWith(".")) - }.toArray.sortBy(_.getPath.toString) + val leaves = allFiles.toArray.sortBy(_.getPath.toString) FileTypes( data = leaves.filterNot(f => isSummaryFile(f.getPath)), http://git-wip-us.apache.org/repos/asf/spark/blob/76622c66/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 224b2c6..06566a9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -877,6 +877,23 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } } + + test("SPARK-16975: Partitioned table with the column having '_' should be read correctly") { + withTempDir { dir => + val childDir = new File(dir, dataSourceName).getCanonicalPath + val dataDf = spark.range(10).toDF() + val df = dataDf.withColumn("_col", $"id") + df.write.format(dataSourceName).partitionBy("_col").save(childDir) + val reader = spark.read.format(dataSourceName) + + // This is needed for SimpleTextHadoopFsRelationSuite as SimpleTextSource needs schema. + if (dataSourceName == classOf[SimpleTextSource].getCanonicalName) { + reader.option("dataSchema", dataDf.schema.json) + } + val readBack = reader.load(childDir) + checkAnswer(df, readBack) + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
