This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 92e650c9ccab [SPARK-50545][CORE][SQL][3.5] `AccessControlException`
should be thrown even if `ignoreCorruptFiles` is enabled
92e650c9ccab is described below
commit 92e650c9ccab7a5f4aa25af2d6c0d6052dfe576b
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Dec 12 20:47:18 2024 +0800
[SPARK-50545][CORE][SQL][3.5] `AccessControlException` should be thrown
even if `ignoreCorruptFiles` is enabled
Cherry-pick https://github.com/apache/spark/issues/49143 to branch-3.5
### What changes were proposed in this pull request?
`AccessControlException` extends `IOException` but we should not treat it
as a data corruption issue.
This is similar to SPARK-50483 which handles `BlockMissingException` in the
same way.
```
2024-12-11 06:29:05 WARN HadoopRDD: Skipped the rest content in the
corrupted file:
hdfs://hadoop-master1.orb.local:8020/warehouse/region/part-00000-2dc8a6f6-8cea-4652-8ba1-762c1b65e2b4-c000:192+192
org.apache.hadoop.security.AccessControlException: Permission denied:
user=hive, access=READ,
inode="/warehouse/region/part-00000-2dc8a6f6-8cea-4652-8ba1-762c1b65e2b4-c000":kyuubi.hadoop:hadoop:-rw-------
at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:506)
```
<img width="1462" alt="image"
src="https://github.com/user-attachments/assets/d3a64578-90c6-49bb-b92f-7c5c71451a9b">
### Why are the changes needed?
Avoid data issue if `ignoreCorruptFiles` is enabled when
`AccessControlException` occurred.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Manual test.
Task fails with `org.apache.hadoop.security.AccessControlException` even
with `spark.sql.files.ignoreCorruptFiles=true` and
`spark.files.ignoreCorruptFiles=true` <img width="1477" alt="image"
src="https://github.com/user-attachments/assets/373ad5fc-15f5-486f-aba3-53b7f7af3b13">
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49162 from pan3793/SPARK-50545-3.5.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 5 +++--
core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 5 +++--
.../org/apache/spark/sql/execution/datasources/FileScanRDD.scala | 3 ++-
.../spark/sql/execution/datasources/v2/FilePartitionReader.scala | 5 +++--
4 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index edd07a2649db..8aa7d54fd61b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.security.AccessControlException
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
@@ -294,7 +295,7 @@ class HadoopRDD[K, V](
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ : BlockMissingException) =>
throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file:
${split.inputSplit}", e)
finished = true
@@ -320,7 +321,7 @@ class HadoopRDD[K, V](
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ : BlockMissingException) =>
throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(s"Skipped the rest content in the corrupted file:
${split.inputSplit}", e)
finished = true
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index fbd2235aabaf..7fc93806998b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit,
FileInputFormat, FileSplit, InvalidInputException}
import org.apache.hadoop.mapreduce.task.{JobContextImpl,
TaskAttemptContextImpl}
+import org.apache.hadoop.security.AccessControlException
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
@@ -228,7 +229,7 @@ class NewHadoopRDD[K, V](
null
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ : BlockMissingException) =>
throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file:
${split.serializableHadoopSplit}",
@@ -257,7 +258,7 @@ class NewHadoopRDD[K, V](
finished = true
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ : BlockMissingException)
=> throw e
case e: IOException if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest content in the corrupted file:
${split.serializableHadoopSplit}",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ce56fc1b2829..8f6f981ec6a7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.BlockMissingException
+import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException,
TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -260,7 +261,7 @@ class FileScanRDD(
null
// Throw FileNotFoundException even if `ignoreCorruptFiles`
is true
case e: FileNotFoundException if !ignoreMissingFiles =>
throw e
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ :
BlockMissingException) => throw e
case e @ (_: RuntimeException | _: IOException) if
ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file:
$currentFile", e)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 8f51226dcfe9..6a63d8268c3b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -21,6 +21,7 @@ import java.io.{FileNotFoundException, IOException}
import scala.util.control.NonFatal
import org.apache.hadoop.hdfs.BlockMissingException
+import org.apache.hadoop.security.AccessControlException
import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
@@ -51,7 +52,7 @@ class FilePartitionReader[T](
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles =>
throw QueryExecutionErrors.fileNotFoundError(e)
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ : BlockMissingException) =>
throw e
case e @ (_: RuntimeException | _: IOException) if
ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file.", e)
@@ -71,7 +72,7 @@ class FilePartitionReader[T](
throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
currentReader.file.urlEncodedPath,
e.getColumn, e.getLogicalType, e.getPhysicalType, e)
- case e: BlockMissingException => throw e
+ case e @ (_ : AccessControlException | _ : BlockMissingException) =>
throw e
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file:
$currentReader", e)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]