This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 829e742df825 [SPARK-46862][SQL] Disable CSV column pruning in the multi-line mode 829e742df825 is described below commit 829e742df8251c6f5e965cb08ad454ac3ee1a389 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Fri Jan 26 11:02:15 2024 +0300 [SPARK-46862][SQL] Disable CSV column pruning in the multi-line mode ### What changes were proposed in this pull request? In the PR, I propose to disable the column pruning feature in the CSV datasource for the `multiLine` mode. ### Why are the changes needed? To workaround the issue in the `uniVocity` parser used by the CSV datasource: https://github.com/uniVocity/univocity-parsers/issues/529 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *CSVv1Suite" $ build/sbt "test:testOnly *CSVv2Suite" $ build/sbt "test:testOnly *CSVLegacyTimeParserSuite" $ build/sbt "testOnly *.CsvFunctionsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44872 from MaxGekk/csv-disable-column-pruning. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 +++++++++ .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../v2/csv/CSVPartitionReaderFactory.scala | 2 +- .../sql/execution/datasources/csv/CSVSuite.scala | 25 +++++++++++++++++++++- 4 files changed, 36 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 845c815c5648..c5a6bf5076de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -277,6 +277,15 @@ class CSVOptions( val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters .getOrElse(UNESCAPED_QUOTE_HANDLING, "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT)) + /** + * The column pruning feature can be enabled either via the CSV option `columnPruning` or + * in non-multiline mode via initialization of CSV options by the SQL config: + * `spark.sql.csv.parser.columnPruning.enabled`. + * The feature is disabled in the `multiLine` mode because of the issue: + * https://github.com/uniVocity/univocity-parsers/issues/529 + */ + val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && columnPruning) + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat @@ -376,4 +385,5 @@ object CSVOptions extends DataSourceOptions { val SEP = "sep" val DELIMITER = "delimiter" newOption(SEP, DELIMITER) + val COLUMN_PRUNING = newOption("columnPruning") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index eb7e120277bb..34a8b3d09047 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -71,7 +71,7 @@ class UnivocityParser( // positions. Generally assigned by input configuration options, except when input column(s) have // default values, in which case we omit the explicit indexes in order to know how many tokens // were present in each line instead. - private def columnPruning: Boolean = options.columnPruning && + private def columnPruning: Boolean = options.isColumnPruningEnabled && !requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) // When column pruning is enabled, the parser only parses the required columns based on diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index 37f6ae4aaa9f..cef5a71ca9c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -58,7 +58,7 @@ case class CSVPartitionReaderFactory( actualReadDataSchema, options, filters) - val schema = if (options.columnPruning) actualReadDataSchema else actualDataSchema + val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e1e39ac1590f..8e6282bd5a42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2089,6 +2089,7 @@ abstract class CSVSuite .option("header", true) .option("enforceSchema", false) .option("multiLine", multiLine) + .option("columnPruning", true) .load(dir) .select("columnA"), Row("a")) @@ -2099,6 +2100,7 @@ abstract class CSVSuite .option("header", true) .option("enforceSchema", false) .option("multiLine", multiLine) + .option("columnPruning", true) .load(dir) .count() === 1L) } @@ -3163,7 +3165,7 @@ abstract class CSVSuite } test("SPARK-40667: validate CSV Options") { - assert(CSVOptions.getAllOptions.size == 38) + assert(CSVOptions.getAllOptions.size == 39) // Please add validation on any new CSV options here assert(CSVOptions.isValidOption("header")) assert(CSVOptions.isValidOption("inferSchema")) @@ -3203,6 +3205,7 @@ abstract class CSVSuite assert(CSVOptions.isValidOption("codec")) assert(CSVOptions.isValidOption("sep")) assert(CSVOptions.isValidOption("delimiter")) + assert(CSVOptions.isValidOption("columnPruning")) // Please add validation on any new parquet options with alternative here assert(CSVOptions.getAlternativeOption("sep").contains("delimiter")) assert(CSVOptions.getAlternativeOption("delimiter").contains("sep")) @@ -3212,6 +3215,26 @@ abstract class CSVSuite assert(CSVOptions.getAlternativeOption("codec").contains("compression")) assert(CSVOptions.getAlternativeOption("preferDate").isEmpty) } + + test("SPARK-46862: column pruning in the multi-line mode") { + val data = + """"jobID","Name","City","Active" + |"1","DE","","Yes" + |"5",",","","," + |"3","SA","","No" + |"10","abcd""efgh"" \ndef","","" + |"8","SE","","No"""".stripMargin + + withTempPath { path => + Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8)) + val df = spark.read + .option("multiline", "true") + .option("header", "true") + .option("escape", "\"") + .csv(path.getCanonicalPath) + assert(df.count() === 5) + } + } } class CSVv1Suite extends CSVSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org