This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 60d02b444e2 [SPARK-45316][CORE][SQL] Add new parameters
`ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`
60d02b444e2 is described below
commit 60d02b444e2225b3afbe4955dabbea505e9f769c
Author: Max Gekk <[email protected]>
AuthorDate: Tue Sep 26 17:33:07 2023 +0300
[SPARK-45316][CORE][SQL] Add new parameters
`ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`
### What changes were proposed in this pull request?
In the PR, I propose to add new parameters
`ignoreCorruptFiles`/`ignoreMissingFiles` to `HadoopRDD` and `NewHadoopRDD`,
and set it to the current value of:
- `spark.files.ignoreCorruptFiles`/`ignoreMissingFiles` in Spark `core`,
- `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` when the rdds
created in Spark SQL.
### Why are the changes needed?
1. To make `HadoopRDD` and `NewHadoopRDD` consistent to other RDDs like
`FileScanRDD` created by Spark SQL that take into account the SQL configs
`spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles`.
2. To improve user experience with Spark SQL, so, users can control
ignoring of missing files without re-creating spark context.
### Does this PR introduce _any_ user-facing change?
Yes, `HadoopRDD`/`NewHadoopRDD` invoked by SQL code such hive table scans
respect the SQL configs
`spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` and don't respect the
core configs `spark.files.ignoreCorruptFiles`/`ignoreMissingFiles`.
### How was this patch tested?
By running the affected tests:
```
$ build/sbt "test:testOnly *QueryPartitionSuite"
$ build/sbt "test:testOnly *FileSuite"
$ build/sbt "test:testOnly *FileBasedDataSourceSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43097 from MaxGekk/dynamic-ignoreMissingFiles.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 31 ++++++++++++++++++----
.../scala/org/apache/spark/rdd/NewHadoopRDD.scala | 27 +++++++++++++++----
docs/sql-migration-guide.md | 1 +
.../org/apache/spark/sql/hive/TableReader.scala | 9 ++++---
.../spark/sql/hive/QueryPartitionSuite.scala | 6 ++---
5 files changed, 58 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index cad107256c5..0b5f6a3d716 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -89,6 +89,8 @@ private[spark] class HadoopPartition(rddId: Int, override val
index: Int, s: Inp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits)
to generate.
+ * @param ignoreCorruptFiles Whether to ignore corrupt files.
+ * @param ignoreMissingFiles Whether to ignore missing files.
*
* @note Instantiating this class directly is not recommended, please use
* `org.apache.spark.SparkContext.hadoopRDD()`
@@ -101,13 +103,36 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minPartitions: Int)
+ minPartitions: Int,
+ ignoreCorruptFiles: Boolean,
+ ignoreMissingFiles: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
if (initLocalJobConfFuncOpt.isDefined) {
sparkContext.clean(initLocalJobConfFuncOpt.get)
}
+ def this(
+ sc: SparkContext,
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ initLocalJobConfFuncOpt: Option[JobConf => Unit],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minPartitions: Int) = {
+ this(
+ sc,
+ broadcastedConf,
+ initLocalJobConfFuncOpt,
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minPartitions,
+ ignoreCorruptFiles = sc.conf.get(IGNORE_CORRUPT_FILES),
+ ignoreMissingFiles = sc.conf.get(IGNORE_MISSING_FILES)
+ )
+ }
+
def this(
sc: SparkContext,
conf: JobConf,
@@ -135,10 +160,6 @@ class HadoopRDD[K, V](
private val shouldCloneJobConf =
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
- private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
-
- private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
-
private val ignoreEmptySplits =
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
// Returns a JobConf that will be used on executors to obtain input splits
for Hadoop reads.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 119fdae531f..17ef3214889 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -64,6 +64,8 @@ private[spark] class NewHadoopPartition(
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
+ * @param ignoreCorruptFiles Whether to ignore corrupt files.
+ * @param ignoreMissingFiles Whether to ignore missing files.
*
* @note Instantiating this class directly is not recommended, please use
* `org.apache.spark.SparkContext.newAPIHadoopRDD()`
@@ -74,9 +76,28 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient private val _conf: Configuration)
+ @transient private val _conf: Configuration,
+ ignoreCorruptFiles: Boolean,
+ ignoreMissingFiles: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
+ def this(
+ sc : SparkContext,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ _conf: Configuration) = {
+ this(
+ sc,
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ _conf,
+ ignoreCorruptFiles = sc.conf.get(IGNORE_CORRUPT_FILES),
+ ignoreMissingFiles = sc.conf.get(IGNORE_MISSING_FILES))
+ }
+
+
// A Hadoop Configuration can be about 10 KB, which is pretty big, so
broadcast it
private val confBroadcast = sc.broadcast(new
SerializableConfiguration(_conf))
// private val serializableConf = new SerializableWritable(_conf)
@@ -90,10 +111,6 @@ class NewHadoopRDD[K, V](
private val shouldCloneJobConf =
sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
- private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
-
- private val ignoreMissingFiles = sparkContext.conf.get(IGNORE_MISSING_FILES)
-
private val ignoreEmptySplits =
sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)
def getConf: Configuration = {
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 61173f58b77..56a3c8292cd 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -25,6 +25,7 @@ license: |
## Upgrading from Spark SQL 3.5 to 4.0
- Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is
changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set
`spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
+- Since Spark 4.0, any read of SQL tables takes into consideration the SQL
configs
`spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles`
instead of the core config
`spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
## Upgrading from Spark SQL 3.4 to 3.5
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index cd1d236dd36..5bb982624b0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -366,7 +366,9 @@ class HadoopTableReader(
inputFormatClass,
classOf[Writable],
classOf[Writable],
- _minSplitsPerRDD)
+ _minSplitsPerRDD,
+ ignoreCorruptFiles = conf.ignoreCorruptFiles,
+ ignoreMissingFiles = conf.ignoreMissingFiles)
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
@@ -400,8 +402,9 @@ class HadoopTableReader(
inputFormatClass,
classOf[Writable],
classOf[Writable],
- jobConf
- )
+ jobConf,
+ ignoreCorruptFiles = conf.ignoreCorruptFiles,
+ ignoreMissingFiles = conf.ignoreMissingFiles)
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index cec6ec1ee12..f4fb18119fa 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import java.sql.Timestamp
-import org.apache.spark.internal.config._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -73,8 +72,9 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils
with TestHiveSingl
}
test("Replace spark.sql.hive.verifyPartitionPath by
spark.files.ignoreMissingFiles") {
- withSQLConf(SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false") {
- sparkContext.conf.set(IGNORE_MISSING_FILES.key, "true")
+ withSQLConf(
+ SQLConf.HIVE_VERIFY_PARTITION_PATH.key -> "false",
+ SQLConf.IGNORE_MISSING_FILES.key -> "true") {
queryWhenPathNotExist()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]