This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5254840547b4 [SPARK-46862][SQL][FOLLOWUP] Fix column pruning without
schema enforcing in V1 CSV datasource
5254840547b4 is described below
commit 5254840547b48390ad98dd61dc1fc505a82ce8fc
Author: Max Gekk <[email protected]>
AuthorDate: Sat Jan 27 19:22:52 2024 +0300
[SPARK-46862][SQL][FOLLOWUP] Fix column pruning without schema enforcing in
V1 CSV datasource
### What changes were proposed in this pull request?
In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled`
introduced by https://github.com/apache/spark/pull/44872 while matching of CSV
header to a schema in the V1 CSV datasource.
### Why are the changes needed?
To fix the failure when column pruning happens and a schema is not enforced:
```scala
scala> spark.read.
| option("multiLine", true).
| option("header", true).
| option("escape", "\"").
| option("enforceSchema", false).
| csv("/Users/maximgekk/tmp/es-939111-data.csv").
| count()
24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: Number of column in CSV header is not
equal to number of fields in the schema:
Header length: 4, schema size: 0
CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *CSVv1Suite"
$ build/sbt "test:testOnly *CSVv2Suite"
$ build/sbt "test:testOnly *CSVLegacyTimeParserSuite"
$ build/sbt "testOnly *.CsvFunctionsSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44910 from MaxGekk/check-header-column-pruning.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit bc51c9fea3645c6ae1d9e1e83b0f94f8b849be20)
Signed-off-by: Max Gekk <[email protected]>
---
.../sql/execution/datasources/csv/CSVFileFormat.scala | 6 +++---
.../spark/sql/execution/datasources/csv/CSVSuite.scala | 15 +++++++++------
2 files changed, 12 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 069ad9562a7d..0ff96f073f03 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -100,12 +100,12 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] =
{
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
- val columnPruning = sparkSession.sessionState.conf.csvColumnPruning
val parsedOptions = new CSVOptions(
options,
- columnPruning,
+ sparkSession.sessionState.conf.csvColumnPruning,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled
// Check a field requirement for corrupt records here to throw an
exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema,
parsedOptions.columnNameOfCorruptRecord)
@@ -125,7 +125,7 @@ class CSVFileFormat extends TextBasedFileFormat with
DataSourceRegister {
actualRequiredSchema,
parsedOptions,
actualFilters)
- val schema = if (columnPruning) actualRequiredSchema else
actualDataSchema
+ val schema = if (isColumnPruningEnabled) actualRequiredSchema else
actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}",
isStartOfFile)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b3fb9effebca..23dd218e1479 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -3166,12 +3166,15 @@ abstract class CSVSuite
withTempPath { path =>
Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
- val df = spark.read
- .option("multiline", "true")
- .option("header", "true")
- .option("escape", "\"")
- .csv(path.getCanonicalPath)
- assert(df.count() === 5)
+ Seq(true, false).foreach { enforceSchema =>
+ val df = spark.read
+ .option("multiLine", true)
+ .option("header", true)
+ .option("escape", "\"")
+ .option("enforceSchema", enforceSchema)
+ .csv(path.getCanonicalPath)
+ assert(df.count() === 5)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]