This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 829e742df825 [SPARK-46862][SQL] Disable CSV column pruning in the
multi-line mode
829e742df825 is described below
commit 829e742df8251c6f5e965cb08ad454ac3ee1a389
Author: Max Gekk <[email protected]>
AuthorDate: Fri Jan 26 11:02:15 2024 +0300
[SPARK-46862][SQL] Disable CSV column pruning in the multi-line mode
### What changes were proposed in this pull request?
In the PR, I propose to disable the column pruning feature in the CSV
datasource for the `multiLine` mode.
### Why are the changes needed?
To workaround the issue in the `uniVocity` parser used by the CSV
datasource: https://github.com/uniVocity/univocity-parsers/issues/529
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *CSVv1Suite"
$ build/sbt "test:testOnly *CSVv2Suite"
$ build/sbt "test:testOnly *CSVLegacyTimeParserSuite"
$ build/sbt "testOnly *.CsvFunctionsSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44872 from MaxGekk/csv-disable-column-pruning.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 +++++++++
.../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +-
.../v2/csv/CSVPartitionReaderFactory.scala | 2 +-
.../sql/execution/datasources/csv/CSVSuite.scala | 25 +++++++++++++++++++++-
4 files changed, 36 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 845c815c5648..c5a6bf5076de 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -277,6 +277,15 @@ class CSVOptions(
val unescapedQuoteHandling: UnescapedQuoteHandling =
UnescapedQuoteHandling.valueOf(parameters
.getOrElse(UNESCAPED_QUOTE_HANDLING,
"STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
+ /**
+ * The column pruning feature can be enabled either via the CSV option
`columnPruning` or
+ * in non-multiline mode via initialization of CSV options by the SQL config:
+ * `spark.sql.csv.parser.columnPruning.enabled`.
+ * The feature is disabled in the `multiLine` mode because of the issue:
+ * https://github.com/uniVocity/univocity-parsers/issues/529
+ */
+ val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine &&
columnPruning)
+
def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
@@ -376,4 +385,5 @@ object CSVOptions extends DataSourceOptions {
val SEP = "sep"
val DELIMITER = "delimiter"
newOption(SEP, DELIMITER)
+ val COLUMN_PRUNING = newOption("columnPruning")
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index eb7e120277bb..34a8b3d09047 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -71,7 +71,7 @@ class UnivocityParser(
// positions. Generally assigned by input configuration options, except when
input column(s) have
// default values, in which case we omit the explicit indexes in order to
know how many tokens
// were present in each line instead.
- private def columnPruning: Boolean = options.columnPruning &&
+ private def columnPruning: Boolean = options.isColumnPruningEnabled &&
!requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
// When column pruning is enabled, the parser only parses the required
columns based on
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index 37f6ae4aaa9f..cef5a71ca9c6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -58,7 +58,7 @@ case class CSVPartitionReaderFactory(
actualReadDataSchema,
options,
filters)
- val schema = if (options.columnPruning) actualReadDataSchema else
actualDataSchema
+ val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else
actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, options, source = s"CSV file: ${file.urlEncodedPath}",
isStartOfFile)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index e1e39ac1590f..8e6282bd5a42 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2089,6 +2089,7 @@ abstract class CSVSuite
.option("header", true)
.option("enforceSchema", false)
.option("multiLine", multiLine)
+ .option("columnPruning", true)
.load(dir)
.select("columnA"),
Row("a"))
@@ -2099,6 +2100,7 @@ abstract class CSVSuite
.option("header", true)
.option("enforceSchema", false)
.option("multiLine", multiLine)
+ .option("columnPruning", true)
.load(dir)
.count() === 1L)
}
@@ -3163,7 +3165,7 @@ abstract class CSVSuite
}
test("SPARK-40667: validate CSV Options") {
- assert(CSVOptions.getAllOptions.size == 38)
+ assert(CSVOptions.getAllOptions.size == 39)
// Please add validation on any new CSV options here
assert(CSVOptions.isValidOption("header"))
assert(CSVOptions.isValidOption("inferSchema"))
@@ -3203,6 +3205,7 @@ abstract class CSVSuite
assert(CSVOptions.isValidOption("codec"))
assert(CSVOptions.isValidOption("sep"))
assert(CSVOptions.isValidOption("delimiter"))
+ assert(CSVOptions.isValidOption("columnPruning"))
// Please add validation on any new parquet options with alternative here
assert(CSVOptions.getAlternativeOption("sep").contains("delimiter"))
assert(CSVOptions.getAlternativeOption("delimiter").contains("sep"))
@@ -3212,6 +3215,26 @@ abstract class CSVSuite
assert(CSVOptions.getAlternativeOption("codec").contains("compression"))
assert(CSVOptions.getAlternativeOption("preferDate").isEmpty)
}
+
+ test("SPARK-46862: column pruning in the multi-line mode") {
+ val data =
+ """"jobID","Name","City","Active"
+ |"1","DE","","Yes"
+ |"5",",","",","
+ |"3","SA","","No"
+ |"10","abcd""efgh"" \ndef","",""
+ |"8","SE","","No"""".stripMargin
+
+ withTempPath { path =>
+ Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+ val df = spark.read
+ .option("multiline", "true")
+ .option("header", "true")
+ .option("escape", "\"")
+ .csv(path.getCanonicalPath)
+ assert(df.count() === 5)
+ }
+ }
}
class CSVv1Suite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]