Repository: spark Updated Branches: refs/heads/master 883f3aff6 -> b461acb2d
[SPARK-25134][SQL] Csv column pruning with checking of headers throws incorrect error ## What changes were proposed in this pull request? When column pruning is turned on the checking of headers in the csv should only be for the fields in the requiredSchema, not the dataSchema, because column pruning means only requiredSchema is read. ## How was this patch tested? Added 2 unit tests where column pruning is turned on/off and csv headers are checked againt schema Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22123 from koertkuipers/feat-csv-column-pruning-and-check-header. Authored-by: Koert Kuipers <ko...@tresata.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b461acb2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b461acb2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b461acb2 Branch: refs/heads/master Commit: b461acb2d90b734393c27fe7b359e2f2d297b8d4 Parents: 883f3af Author: Koert Kuipers <ko...@tresata.com> Authored: Tue Aug 21 10:23:55 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Tue Aug 21 10:23:55 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameReader.scala | 7 ++-- .../datasources/csv/CSVDataSource.scala | 40 +++++++------------- .../datasources/csv/CSVFileFormat.scala | 4 +- .../execution/datasources/csv/CSVSuite.scala | 33 ++++++++++++++++ 4 files changed, 53 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b461acb2/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1b3a9fc..5b3b5c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -506,10 +506,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => - CSVDataSource.checkHeader( - firstLine, - new CsvParser(parsedOptions.asParserSettings), + val parser = new CsvParser(parsedOptions.asParserSettings) + val columnNames = parser.parseLine(firstLine) + CSVDataSource.checkHeaderColumnNames( actualSchema, + columnNames, csvDataset.getClass.getCanonicalName, parsedOptions.enforceSchema, sparkSession.sessionState.conf.caseSensitiveAnalysis) http://git-wip-us.apache.org/repos/asf/spark/blob/b461acb2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index b7b46c7..2b86054 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -54,7 +54,8 @@ abstract class CSVDataSource extends Serializable { requiredSchema: StructType, // Actual schema of data in the csv file dataSchema: StructType, - caseSensitive: Boolean): Iterator[InternalRow] + caseSensitive: Boolean, + columnPruning: Boolean): Iterator[InternalRow] /** * Infers the schema from `inputPaths` files. @@ -181,25 +182,6 @@ object CSVDataSource extends Logging { } } } - - /** - * Checks that CSV header contains the same column names as fields names in the given schema - * by taking into account case sensitivity. - */ - def checkHeader( - header: String, - parser: CsvParser, - schema: StructType, - fileName: String, - enforceSchema: Boolean, - caseSensitive: Boolean): Unit = { - checkHeaderColumnNames( - schema, - parser.parseLine(header), - fileName, - enforceSchema, - caseSensitive) - } } object TextInputCSVDataSource extends CSVDataSource { @@ -211,7 +193,8 @@ object TextInputCSVDataSource extends CSVDataSource { parser: UnivocityParser, requiredSchema: StructType, dataSchema: StructType, - caseSensitive: Boolean): Iterator[InternalRow] = { + caseSensitive: Boolean, + columnPruning: Boolean): Iterator[InternalRow] = { val lines = { val linesReader = new HadoopFileLinesReader(file, conf) Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close())) @@ -227,10 +210,11 @@ object TextInputCSVDataSource extends CSVDataSource { // Note: if there are only comments in the first block, the header would probably // be not extracted. CSVUtils.extractHeader(lines, parser.options).foreach { header => - CSVDataSource.checkHeader( - header, - parser.tokenizer, - dataSchema, + val schema = if (columnPruning) requiredSchema else dataSchema + val columnNames = parser.tokenizer.parseLine(header) + CSVDataSource.checkHeaderColumnNames( + schema, + columnNames, file.filePath, parser.options.enforceSchema, caseSensitive) @@ -308,10 +292,12 @@ object MultiLineCSVDataSource extends CSVDataSource { parser: UnivocityParser, requiredSchema: StructType, dataSchema: StructType, - caseSensitive: Boolean): Iterator[InternalRow] = { + caseSensitive: Boolean, + columnPruning: Boolean): Iterator[InternalRow] = { def checkHeader(header: Array[String]): Unit = { + val schema = if (columnPruning) requiredSchema else dataSchema CSVDataSource.checkHeaderColumnNames( - dataSchema, + schema, header, file.filePath, parser.options.enforceSchema, http://git-wip-us.apache.org/repos/asf/spark/blob/b461acb2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index d59b982..9aad0bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -131,6 +131,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { ) } val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + val columnPruning = sparkSession.sessionState.conf.csvColumnPruning (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value @@ -144,7 +145,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { parser, requiredSchema, dataSchema, - caseSensitive) + caseSensitive, + columnPruning) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b461acb2/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 14840e5..5a1d667 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1603,6 +1603,39 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } + test("SPARK-25134: check header on parsing of dataset with projection and column pruning") { + withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { + Seq(false, true).foreach { multiLine => + withTempPath { path => + val dir = path.getAbsolutePath + Seq(("a", "b")).toDF("columnA", "columnB").write + .format("csv") + .option("header", true) + .save(dir) + + // schema with one column + checkAnswer(spark.read + .format("csv") + .option("header", true) + .option("enforceSchema", false) + .option("multiLine", multiLine) + .load(dir) + .select("columnA"), + Row("a")) + + // empty schema + assert(spark.read + .format("csv") + .option("header", true) + .option("enforceSchema", false) + .option("multiLine", multiLine) + .load(dir) + .count() === 1L) + } + } + } + } + test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") { withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") { withTempPath { path => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org