This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a92ef00145b [SPARK-38767][SQL] Support `ignoreCorruptFiles` and
`ignoreMissingFiles` in Data Source options
a92ef00145b is described below
commit a92ef00145b264013e11de12f2c7cee62c28198d
Author: yaohua <[email protected]>
AuthorDate: Tue Apr 12 12:50:13 2022 -0700
[SPARK-38767][SQL] Support `ignoreCorruptFiles` and `ignoreMissingFiles` in
Data Source options
### What changes were proposed in this pull request?
Support `ignoreCorruptFiles` and `ignoreMissingFiles` in Data Source
options for both V1 and V2 file-based data sources.
```
spark.read...option("ignoreCorruptFiles", "true")...
spark.read...option("ignoreMissingFiles", "true")...
```
### Why are the changes needed?
Improve UX
### Does this PR introduce _any_ user-facing change?
Yes.
Previously:
```
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
spark.sql("set spark.sql.files.ignoreMissingFiles=true")
spark.read...
spark.read...
```
Now:
```
spark.read...option("ignoreCorruptFiles", "true")...
spark.read...option("ignoreMissingFiles", "true")...
```
### How was this patch tested?
Enhance existing UTs for ignoreMissingFiles + ignoreCorruptFiles
Closes #36069 from Yaohua628/spark-38767.
Authored-by: yaohua <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/avro/AvroOptions.scala | 4 +-
.../org/apache/spark/sql/avro/AvroUtils.scala | 5 ++-
.../sql/v2/avro/AvroPartitionReaderFactory.scala | 12 +++---
docs/sql-data-sources-generic-options.md | 4 +-
.../examples/sql/JavaSQLDataSourceExample.java | 19 +++++++--
examples/src/main/python/sql/datasource.py | 21 ++++++++--
examples/src/main/r/RSparkSQLExample.R | 14 +++++--
.../spark/examples/sql/SQLDataSourceExample.scala | 19 +++++++--
.../spark/sql/catalyst/FileSourceOptions.scala | 42 +++++++++++++++++++
.../apache/spark/sql/catalyst/csv/CSVOptions.scala | 3 +-
.../spark/sql/catalyst/json/JSONOptions.scala | 3 +-
.../spark/sql/execution/DataSourceScanExec.scala | 8 ++--
.../sql/execution/datasources/FileScanRDD.scala | 10 +++--
.../execution/datasources/SchemaMergeUtils.scala | 5 ++-
.../execution/datasources/orc/OrcFileFormat.scala | 5 ++-
.../sql/execution/datasources/orc/OrcOptions.scala | 3 +-
.../sql/execution/datasources/orc/OrcUtils.scala | 7 ++--
.../datasources/parquet/ParquetOptions.scala | 3 +-
.../execution/datasources/text/TextOptions.scala | 3 +-
.../datasources/v2/FilePartitionReader.scala | 11 ++---
.../v2/FilePartitionReaderFactory.scala | 9 +++--
.../v2/csv/CSVPartitionReaderFactory.scala | 16 ++++----
.../v2/json/JsonPartitionReaderFactory.scala | 10 ++---
.../v2/orc/OrcPartitionReaderFactory.scala | 8 ++--
.../sql/execution/datasources/v2/orc/OrcScan.scala | 6 ++-
.../v2/parquet/ParquetPartitionReaderFactory.scala | 8 ++--
.../v2/text/TextPartitionReaderFactory.scala | 8 ++--
.../spark/sql/FileBasedDataSourceSuite.scala | 31 ++++++++------
.../datasources/parquet/ParquetQuerySuite.scala | 47 +++++++++++++++-------
29 files changed, 241 insertions(+), 103 deletions(-)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 48b2c3481a6..7f6a274753c 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode,
ParseMode}
import org.apache.spark.sql.internal.SQLConf
@@ -33,7 +34,8 @@ import org.apache.spark.sql.internal.SQLConf
*/
private[sql] class AvroOptions(
@transient val parameters: CaseInsensitiveMap[String],
- @transient val conf: Configuration) extends Logging with Serializable {
+ @transient val conf: Configuration)
+ extends FileSourceOptions(parameters) with Logging {
def this(parameters: Map[String, String], conf: Configuration) = {
this(CaseInsensitiveMap(parameters), conf)
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index ef9d22f35d0..d03902faab9 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -35,7 +35,8 @@ import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroOptions.ignoreExtensionKey
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -57,7 +58,7 @@ private[sql] object AvroUtils extends Logging {
val avroSchema = parsedOptions.schema
.getOrElse {
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
- spark.sessionState.conf.ignoreCorruptFiles)
+ new
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
}
SchemaConverters.toSqlType(avroSchema).dataType match {
diff --git
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index a4dfdbfe68f..3ad63f113fe 100644
---
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.SerializableConfiguration
* @param dataSchema Schema of AVRO files.
* @param readDataSchema Required data schema of AVRO files.
* @param partitionSchema Schema of partitions.
- * @param parsedOptions Options for parsing AVRO files.
+ * @param options Options for parsing AVRO files.
*/
case class AvroPartitionReaderFactory(
sqlConf: SQLConf,
@@ -54,15 +54,15 @@ case class AvroPartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
- parsedOptions: AvroOptions,
+ options: AvroOptions,
filters: Seq[Filter]) extends FilePartitionReaderFactory with Logging {
- private val datetimeRebaseModeInRead = parsedOptions.datetimeRebaseModeInRead
+ private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
override def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
- val userProvidedSchema = parsedOptions.schema
+ val userProvidedSchema = options.schema
- if (parsedOptions.ignoreExtension ||
partitionedFile.filePath.endsWith(".avro")) {
+ if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro"))
{
val reader = {
val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf)
try {
@@ -104,7 +104,7 @@ case class AvroPartitionReaderFactory(
override val deserializer = new AvroDeserializer(
userProvidedSchema.getOrElse(reader.getSchema),
readDataSchema,
- parsedOptions.positionalFieldMatching,
+ options.positionalFieldMatching,
datetimeRebaseMode,
avroFilters)
override val stopPosition = partitionedFile.start +
partitionedFile.length
diff --git a/docs/sql-data-sources-generic-options.md
b/docs/sql-data-sources-generic-options.md
index 2e4fc879a43..7835371ec43 100644
--- a/docs/sql-data-sources-generic-options.md
+++ b/docs/sql-data-sources-generic-options.md
@@ -38,7 +38,7 @@ dir1/
### Ignore Corrupt Files
-Spark allows you to use `spark.sql.files.ignoreCorruptFiles` to ignore corrupt
files while reading data
+Spark allows you to use the configuration `spark.sql.files.ignoreCorruptFiles`
or the data source option `ignoreCorruptFiles` to ignore corrupt files while
reading data
from files. When set to true, the Spark jobs will continue to run when
encountering corrupted files and
the contents that have been read will still be returned.
@@ -64,7 +64,7 @@ To ignore corrupt files while reading data files, you can use:
### Ignore Missing Files
-Spark allows you to use `spark.sql.files.ignoreMissingFiles` to ignore missing
files while reading data
+Spark allows you to use the configuration `spark.sql.files.ignoreMissingFiles`
or the data source option `ignoreMissingFiles` to ignore missing files while
reading data
from files. Here, missing file really means the deleted file under directory
after you construct the
`DataFrame`. When set to true, the Spark jobs will continue to run when
encountering missing files and
the contents that have been read will still be returned.
diff --git
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 5dcf321a4c8..c0960540b49 100644
---
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -115,13 +115,26 @@ public class JavaSQLDataSourceExample {
private static void runGenericFileSourceOptionsExample(SparkSession spark) {
// $example on:ignore_corrupt_files$
- // enable ignore corrupt files
+ // enable ignore corrupt files via the data source option
+ // dir1/file3.json is corrupt from parquet's view
+ Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles",
"true").parquet(
+ "examples/src/main/resources/dir1/",
+ "examples/src/main/resources/dir1/dir2/");
+ testCorruptDF0.show();
+ // +-------------+
+ // | file|
+ // +-------------+
+ // |file1.parquet|
+ // |file2.parquet|
+ // +-------------+
+
+ // enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json is corrupt from parquet's view
- Dataset<Row> testCorruptDF = spark.read().parquet(
+ Dataset<Row> testCorruptDF1 = spark.read().parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
- testCorruptDF.show();
+ testCorruptDF1.show();
// +-------------+
// | file|
// +-------------+
diff --git a/examples/src/main/python/sql/datasource.py
b/examples/src/main/python/sql/datasource.py
index fd312dbf164..c7522cb9d34 100644
--- a/examples/src/main/python/sql/datasource.py
+++ b/examples/src/main/python/sql/datasource.py
@@ -28,12 +28,25 @@ from pyspark.sql import Row
def generic_file_source_options_example(spark: SparkSession) -> None:
# $example on:ignore_corrupt_files$
- # enable ignore corrupt files
+ # enable ignore corrupt files via the data source option
+ # dir1/file3.json is corrupt from parquet's view
+ test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
+ .parquet("examples/src/main/resources/dir1/",
+ "examples/src/main/resources/dir1/dir2/")
+ test_corrupt_df0.show()
+ # +-------------+
+ # | file|
+ # +-------------+
+ # |file1.parquet|
+ # |file2.parquet|
+ # +-------------+
+
+ # enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
- test_corrupt_df = spark.read.parquet("examples/src/main/resources/dir1/",
-
"examples/src/main/resources/dir1/dir2/")
- test_corrupt_df.show()
+ test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
+
"examples/src/main/resources/dir1/dir2/")
+ test_corrupt_df1.show()
# +-------------+
# | file|
# +-------------+
diff --git a/examples/src/main/r/RSparkSQLExample.R
b/examples/src/main/r/RSparkSQLExample.R
index 15118e118ab..a7d3ae766c5 100644
--- a/examples/src/main/r/RSparkSQLExample.R
+++ b/examples/src/main/r/RSparkSQLExample.R
@@ -101,11 +101,19 @@ df <- sql("SELECT * FROM table")
# Ignore corrupt files
# $example on:ignore_corrupt_files$
-# enable ignore corrupt files
+# enable ignore corrupt files via the data source option
+# dir1/file3.json is corrupt from parquet's view
+testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
+head(testCorruptDF0)
+# file
+# 1 file1.parquet
+# 2 file2.parquet
+
+# enable ignore corrupt files via the configuration
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
-testCorruptDF <- read.parquet(c("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/"))
-head(testCorruptDF)
+testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/"))
+head(testCorruptDF1)
# file
# 1 file1.parquet
# 2 file2.parquet
diff --git
a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
index 6bd2bd6d3bf..9b04994199d 100644
---
a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
+++
b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala
@@ -45,13 +45,26 @@ object SQLDataSourceExample {
private def runGenericFileSourceOptionsExample(spark: SparkSession): Unit = {
// $example on:ignore_corrupt_files$
- // enable ignore corrupt files
+ // enable ignore corrupt files via the data source option
+ // dir1/file3.json is corrupt from parquet's view
+ val testCorruptDF0 = spark.read.option("ignoreCorruptFiles",
"true").parquet(
+ "examples/src/main/resources/dir1/",
+ "examples/src/main/resources/dir1/dir2/")
+ testCorruptDF0.show()
+ // +-------------+
+ // | file|
+ // +-------------+
+ // |file1.parquet|
+ // |file2.parquet|
+ // +-------------+
+
+ // enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
- val testCorruptDF = spark.read.parquet(
+ val testCorruptDF1 = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
- testCorruptDF.show()
+ testCorruptDF1.show()
// +-------------+
// | file|
// +-------------+
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
new file mode 100644
index 00000000000..6b9826d652e
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES,
IGNORE_MISSING_FILES}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Common options for the file-based data source.
+ */
+class FileSourceOptions(
+ @transient private val parameters: CaseInsensitiveMap[String])
+ extends Serializable {
+
+ def this(parameters: Map[String, String]) =
this(CaseInsensitiveMap(parameters))
+
+ val ignoreCorruptFiles: Boolean =
parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean)
+ .getOrElse(SQLConf.get.ignoreCorruptFiles)
+
+ val ignoreMissingFiles: Boolean =
parameters.get(IGNORE_MISSING_FILES).map(_.toBoolean)
+ .getOrElse(SQLConf.get.ignoreMissingFiles)
+}
+
+object FileSourceOptions {
+ val IGNORE_CORRUPT_FILES = "ignoreCorruptFiles"
+ val IGNORE_MISSING_FILES = "ignoreMissingFiles"
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 2a404b14bfd..9daa50ba5a4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -24,6 +24,7 @@ import java.util.Locale
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings,
UnescapedQuoteHandling}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
@@ -34,7 +35,7 @@ class CSVOptions(
val columnPruning: Boolean,
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
- extends Logging with Serializable {
+ extends FileSourceOptions(parameters) with Logging {
def this(
parameters: Map[String, String],
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index e801912e192..5f90dbc49c9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.{JsonFactory,
JsonFactoryBuilder}
import com.fasterxml.jackson.core.json.JsonReadFeature
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
@@ -38,7 +39,7 @@ private[sql] class JSONOptions(
@transient val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
- extends Logging with Serializable {
+ extends FileSourceOptions(parameters) with Logging {
def this(
parameters: Map[String, String],
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5067cd7fa3c..5cf8aa91ea5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -25,12 +25,12 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow,
TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, UnknownPartitioning}
-import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat
=> ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -620,7 +620,7 @@ case class FileSourceScanExec(
}
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
- requiredSchema, metadataColumns)
+ requiredSchema, metadataColumns, new
FileSourceOptions(CaseInsensitiveMap(relation.options)))
}
/**
@@ -677,7 +677,7 @@ case class FileSourceScanExec(
FilePartition.getFilePartitions(relation.sparkSession, splitFiles,
maxSplitBytes)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
- requiredSchema, metadataColumns)
+ requiredSchema, metadataColumns, new
FileSourceOptions(CaseInsensitiveMap(relation.options)))
}
// Filters unused DynamicPruningExpression expressions - one which has been
replaced
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 20c393a5c0e..97776413509 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -27,8 +27,9 @@ import org.apache.spark.{Partition => RDDPartition,
SparkUpgradeException, TaskC
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
@@ -69,11 +70,12 @@ class FileScanRDD(
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition],
val readDataSchema: StructType,
- val metadataColumns: Seq[AttributeReference] = Seq.empty)
+ val metadataColumns: Seq[AttributeReference] = Seq.empty,
+ options: FileSourceOptions = new
FileSourceOptions(CaseInsensitiveMap(Map.empty)))
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
- private val ignoreCorruptFiles =
sparkSession.sessionState.conf.ignoreCorruptFiles
- private val ignoreMissingFiles =
sparkSession.sessionState.conf.ignoreMissingFiles
+ private val ignoreCorruptFiles = options.ignoreCorruptFiles
+ private val ignoreMissingFiles = options.ignoreMissingFiles
override def compute(split: RDDPartition, context: TaskContext):
Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
index 98f580f2d4a..babecfc1f38 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -60,7 +62,8 @@ object SchemaMergeUtils extends Logging {
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sparkContext.defaultParallelism)
- val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+ val ignoreCorruptFiles =
+ new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreCorruptFiles
// Issues a Spark job to read Parquet/ORC schema in parallel.
val partiallyMergedSchemas =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2b060c90153..02bc97cbdd6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -33,9 +33,10 @@ import org.apache.orc.mapreduce._
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
@@ -133,7 +134,7 @@ class OrcFileFormat
sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val orcFilterPushDown = sparkSession.sessionState.conf.orcFilterPushDown
- val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+ val ignoreCorruptFiles = new
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
index 9416996198a..ef1c2bb5b41 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import org.apache.orc.OrcConf.COMPRESS
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
@@ -30,7 +31,7 @@ import org.apache.spark.sql.internal.SQLConf
class OrcOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
- extends Serializable {
+ extends FileSourceOptions(parameters) {
import OrcOptions._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index a68ce1a8636..79abdfe4690 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -33,12 +33,11 @@ import org.apache.spark.{SPARK_VERSION_SHORT,
SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier,
CaseInsensitiveMap, CharVarcharUtils}
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation,
Count, CountStar, Max, Min}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
SchemaMergeUtils}
@@ -143,7 +142,7 @@ object OrcUtils extends Logging {
def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options:
Map[String, String])
: Option[StructType] = {
- val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+ val ignoreCorruptFiles = new
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
files.iterator.map(file => readSchema(file.getPath, conf,
ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index da0c163dd85..07ed55b0b8f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -22,6 +22,7 @@ import java.util.Locale
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
@@ -31,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
class ParquetOptions(
@transient private val parameters: CaseInsensitiveMap[String],
@transient private val sqlConf: SQLConf)
- extends Serializable {
+ extends FileSourceOptions(parameters) {
import ParquetOptions._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index ef132162750..f1a1d465d1b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.datasources.text
import java.nio.charset.{Charset, StandardCharsets}
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
CompressionCodecs}
/**
* Options for the Text data source.
*/
class TextOptions(@transient private val parameters:
CaseInsensitiveMap[String])
- extends Serializable {
+ extends FileSourceOptions(parameters) {
import TextOptions._
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 00efd48f951..782c1f50d80 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -23,18 +23,19 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.FileSourceOptions
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.errors.QueryExecutionErrors
import
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
-import org.apache.spark.sql.internal.SQLConf
-class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
+class FilePartitionReader[T](
+ readers: Iterator[PartitionedFileReader[T]],
+ options: FileSourceOptions)
extends PartitionReader[T] with Logging {
private var currentReader: PartitionedFileReader[T] = null
- private val sqlConf = SQLConf.get
- private def ignoreMissingFiles = sqlConf.ignoreMissingFiles
- private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+ private def ignoreMissingFiles = options.ignoreMissingFiles
+ private def ignoreCorruptFiles = options.ignoreCorruptFiles
override def next(): Boolean = {
if (currentReader == null) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
index da4f9e89fde..d7b88b505b0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala
@@ -16,20 +16,23 @@
*/
package org.apache.spark.sql.execution.datasources.v2
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{FilePartition,
PartitionedFile}
import org.apache.spark.sql.vectorized.ColumnarBatch
abstract class FilePartitionReaderFactory extends PartitionReaderFactory {
+
+ protected def options: FileSourceOptions
+
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
assert(partition.isInstanceOf[FilePartition])
val filePartition = partition.asInstanceOf[FilePartition]
val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildReader(file))
}
- new FilePartitionReader[InternalRow](iter)
+ new FilePartitionReader[InternalRow](iter, options)
}
override def createColumnarReader(partition: InputPartition):
PartitionReader[ColumnarBatch] = {
@@ -38,7 +41,7 @@ abstract class FilePartitionReaderFactory extends
PartitionReaderFactory {
val iter = filePartition.files.iterator.map { file =>
PartitionedFileReader(file, buildColumnarReader(file))
}
- new FilePartitionReader[ColumnarBatch](iter)
+ new FilePartitionReader[ColumnarBatch](iter, options)
}
def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow]
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index bf996ab1b31..f8a17c8eaa8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration
* @param dataSchema Schema of CSV files.
* @param readDataSchema Required data schema in the batch scan.
* @param partitionSchema Schema of partitions.
- * @param parsedOptions Options for parsing CSV files.
+ * @param options Options for parsing CSV files.
*/
case class CSVPartitionReaderFactory(
sqlConf: SQLConf,
@@ -44,25 +44,25 @@ case class CSVPartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
- parsedOptions: CSVOptions,
+ options: CSVOptions,
filters: Seq[Filter]) extends FilePartitionReaderFactory {
override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
val conf = broadcastedConf.value.value
val actualDataSchema = StructType(
- dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
+ dataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
val actualReadDataSchema = StructType(
- readDataSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
+ readDataSchema.filterNot(_.name == options.columnNameOfCorruptRecord))
val parser = new UnivocityParser(
actualDataSchema,
actualReadDataSchema,
- parsedOptions,
+ options,
filters)
- val schema = if (parsedOptions.columnPruning) actualReadDataSchema else
actualDataSchema
+ val schema = if (options.columnPruning) actualReadDataSchema else
actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
- schema, parsedOptions, source = s"CSV file: ${file.filePath}",
isStartOfFile)
- val iter = CSVDataSource(parsedOptions).readFile(
+ schema, options, source = s"CSV file: ${file.filePath}", isStartOfFile)
+ val iter = CSVDataSource(options).readFile(
conf,
file,
parser,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
index 9737803b597..d9cd41dd560 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.SerializableConfiguration
* @param dataSchema Schema of JSON files.
* @param readDataSchema Required schema of JSON files.
* @param partitionSchema Schema of partitions.
- * @param parsedOptions Options for parsing JSON files.
+ * @param options Options for parsing JSON files.
* @param filters The filters pushed down to JSON datasource.
*/
case class JsonPartitionReaderFactory(
@@ -45,18 +45,18 @@ case class JsonPartitionReaderFactory(
dataSchema: StructType,
readDataSchema: StructType,
partitionSchema: StructType,
- parsedOptions: JSONOptionsInRead,
+ options: JSONOptionsInRead,
filters: Seq[Filter]) extends FilePartitionReaderFactory {
override def buildReader(partitionedFile: PartitionedFile):
PartitionReader[InternalRow] = {
val actualSchema =
- StructType(readDataSchema.filterNot(_.name ==
parsedOptions.columnNameOfCorruptRecord))
+ StructType(readDataSchema.filterNot(_.name ==
options.columnNameOfCorruptRecord))
val parser = new JacksonParser(
actualSchema,
- parsedOptions,
+ options,
allowArrayAsStructs = true,
filters)
- val iter = JsonDataSource(parsedOptions).readFile(
+ val iter = JsonDataSource(options).readFile(
broadcastedConf.value.value,
partitionedFile,
parser,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index ef13beaf9b4..59e3214f047 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitionedFile}
-import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader,
OrcDeserializer, OrcFilters, OrcUtils}
+import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader,
OrcDeserializer, OrcFilters, OrcOptions, OrcUtils}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
@@ -49,6 +49,7 @@ import org.apache.spark.util.{SerializableConfiguration,
Utils}
* @param dataSchema Schema of orc files.
* @param readDataSchema Required data schema in the batch scan.
* @param partitionSchema Schema of partitions.
+ * @param options Options for parsing ORC files.
*/
case class OrcPartitionReaderFactory(
sqlConf: SQLConf,
@@ -57,12 +58,13 @@ case class OrcPartitionReaderFactory(
readDataSchema: StructType,
partitionSchema: StructType,
filters: Array[Filter],
- aggregation: Option[Aggregation]) extends FilePartitionReaderFactory {
+ aggregation: Option[Aggregation],
+ options: OrcOptions) extends FilePartitionReaderFactory {
private val resultSchema = StructType(readDataSchema.fields ++
partitionSchema.fields)
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
private val capacity = sqlConf.orcVectorizedReaderBatchSize
private val orcFilterPushDown = sqlConf.orcFilterPushDown
- private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+ private val ignoreCorruptFiles = options.ignoreCorruptFiles
override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index baf307257c3..ad8857d9803 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.datasources.v2.orc
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex}
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
@@ -64,7 +67,8 @@ case class OrcScan(
// The partition values are already truncated in `FileScan.partitions`.
// We should use `readPartitionSchema` as the partition schema here.
OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
- dataSchema, readDataSchema, readPartitionSchema, pushedFilters,
pushedAggregate)
+ dataSchema, readDataSchema, readPartitionSchema, pushedFilters,
pushedAggregate,
+ new OrcOptions(options.asScala.toMap, sparkSession.sessionState.conf))
}
override def equals(obj: Any): Boolean = obj match {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index ea4f5e0d287..9a25dd88ff4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -56,7 +56,7 @@ import org.apache.spark.util.SerializableConfiguration
* @param partitionSchema Schema of partitions.
* @param filters Filters to be pushed down in the batch scan.
* @param aggregation Aggregation to be pushed down in the batch scan.
- * @param parquetOptions The options of Parquet datasource that are set for
the read.
+ * @param options The options of Parquet datasource that are set for the read.
*/
case class ParquetPartitionReaderFactory(
sqlConf: SQLConf,
@@ -66,7 +66,7 @@ case class ParquetPartitionReaderFactory(
partitionSchema: StructType,
filters: Array[Filter],
aggregation: Option[Aggregation],
- parquetOptions: ParquetOptions) extends FilePartitionReaderFactory with
Logging {
+ options: ParquetOptions) extends FilePartitionReaderFactory with Logging {
private val isCaseSensitive = sqlConf.caseSensitiveAnalysis
private val resultSchema = StructType(partitionSchema.fields ++
readDataSchema.fields)
private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
@@ -81,8 +81,8 @@ case class ParquetPartitionReaderFactory(
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringStartWith =
sqlConf.parquetFilterPushDownStringStartWith
private val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
- private val datetimeRebaseModeInRead =
parquetOptions.datetimeRebaseModeInRead
- private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+ private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
+ private val int96RebaseModeInRead = options.int96RebaseModeInRead
private def getFooter(file: PartitionedFile): ParquetMetadata = {
val conf = broadcastedConf.value.value
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
index 0cd184da6ef..6542c1c2c3e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
@@ -36,19 +36,19 @@ import org.apache.spark.util.SerializableConfiguration
* @param broadcastedConf Broadcasted serializable Hadoop Configuration.
* @param readDataSchema Required schema in the batch scan.
* @param partitionSchema Schema of partitions.
- * @param textOptions Options for reading a text file.
+ * @param options Options for reading a text file.
* */
case class TextPartitionReaderFactory(
sqlConf: SQLConf,
broadcastedConf: Broadcast[SerializableConfiguration],
readDataSchema: StructType,
partitionSchema: StructType,
- textOptions: TextOptions) extends FilePartitionReaderFactory {
+ options: TextOptions) extends FilePartitionReaderFactory {
override def buildReader(file: PartitionedFile):
PartitionReader[InternalRow] = {
val confValue = broadcastedConf.value.value
- val reader = if (!textOptions.wholeText) {
- new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead,
confValue)
+ val reader = if (!options.wholeText) {
+ new HadoopFileLinesReader(file, options.lineSeparatorInRead, confValue)
} else {
new HadoopFileWholeTextReader(file, confValue)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 9e7eb1d0ad5..5011a7713a5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -181,7 +181,7 @@ class FileBasedDataSourceSuite extends QueryTest
allFileBasedDataSources.foreach { format =>
testQuietly(s"Enabling/disabling ignoreMissingFiles using $format") {
- def testIgnoreMissingFiles(): Unit = {
+ def testIgnoreMissingFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
@@ -197,7 +197,7 @@ class FileBasedDataSourceSuite extends QueryTest
fs.listStatus(p).filter(_.isFile).map(_.getPath)
}
- val df = spark.read.format(format).load(
+ val df = spark.read.options(options).format(format).load(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString,
@@ -214,20 +214,27 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
+ // Test set ignoreMissingFiles via SQL Conf and Data Source reader
options
for {
- ignore <- Seq("true", "false")
+ (ignore, options, sqlConf) <- Seq(
+ // Set via SQL Conf: leave options empty
+ ("true", Map.empty[String, String], "true"),
+ ("false", Map.empty[String, String], "false"),
+ // Set via reader options: explicitly set SQL Conf to opposite
+ ("true", Map("ignoreMissingFiles" -> "true"), "false"),
+ ("false", Map("ignoreMissingFiles" -> "false"), "true"))
sources <- Seq("", format)
} {
- withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> ignore,
- SQLConf.USE_V1_SOURCE_LIST.key -> sources) {
- if (ignore.toBoolean) {
- testIgnoreMissingFiles()
- } else {
- val exception = intercept[SparkException] {
- testIgnoreMissingFiles()
- }
- assert(exception.getMessage().contains("does not exist"))
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> sources,
+ SQLConf.IGNORE_MISSING_FILES.key -> sqlConf) {
+ if (ignore.toBoolean) {
+ testIgnoreMissingFiles(options)
+ } else {
+ val exception = intercept[SparkException] {
+ testIgnoreMissingFiles(options)
}
+ assert(exception.getMessage().contains("does not exist"))
+ }
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 64944054326..426d477d38f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -306,13 +306,13 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
}
test("Enabling/disabling ignoreCorruptFiles") {
- def testIgnoreCorruptFiles(): Unit = {
+ def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath,
"first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath,
"second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath,
"third").toString)
- val df = spark.read.parquet(
+ val df = spark.read.options(options).parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
@@ -320,13 +320,13 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
}
}
- def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
+ def testIgnoreCorruptFilesWithoutSchemaInfer(options: Map[String,
String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath,
"first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath,
"second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath,
"third").toString)
- val df = spark.read.schema("a long").parquet(
+ val df = spark.read.options(options).schema("a long").parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
@@ -334,20 +334,39 @@ abstract class ParquetQuerySuite extends QueryTest with
ParquetTest with SharedS
}
}
- withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
- testIgnoreCorruptFiles()
- testIgnoreCorruptFilesWithoutSchemaInfer()
+ // Test ignoreCorruptFiles = true
+ Seq("SQLConf", "FormatOption").foreach { by =>
+ val (sqlConf, options) = by match {
+ case "SQLConf" => ("true", Map.empty[String, String])
+ // Explicitly set SQLConf to false but still should ignore corrupt
files
+ case "FormatOption" => ("false", Map("ignoreCorruptFiles" -> "true"))
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> sqlConf) {
+ testIgnoreCorruptFiles(options)
+ testIgnoreCorruptFilesWithoutSchemaInfer(options)
+ }
}
- withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
- val exception = intercept[SparkException] {
- testIgnoreCorruptFiles()
+ // Test ignoreCorruptFiles = false
+ Seq("SQLConf", "FormatOption").foreach { by =>
+ val (sqlConf, options) = by match {
+ case "SQLConf" => ("false", Map.empty[String, String])
+ // Explicitly set SQLConf to true but still should not ignore corrupt
files
+ case "FormatOption" => ("true", Map("ignoreCorruptFiles" -> "false"))
}
- assert(exception.getMessage().contains("is not a Parquet file"))
- val exception2 = intercept[SparkException] {
- testIgnoreCorruptFilesWithoutSchemaInfer()
+
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> sqlConf) {
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val exception = intercept[SparkException] {
+ testIgnoreCorruptFiles(options)
+ }
+ assert(exception.getMessage().contains("is not a Parquet file"))
+ val exception2 = intercept[SparkException] {
+ testIgnoreCorruptFilesWithoutSchemaInfer(options)
+ }
+ assert(exception2.getMessage().contains("is not a Parquet file"))
+ }
}
- assert(exception2.getMessage().contains("is not a Parquet file"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]