This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 92e650c9ccab [SPARK-50545][CORE][SQL][3.5] `AccessControlException` 
should be thrown even if `ignoreCorruptFiles` is enabled
92e650c9ccab is described below

commit 92e650c9ccab7a5f4aa25af2d6c0d6052dfe576b
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Dec 12 20:47:18 2024 +0800

    [SPARK-50545][CORE][SQL][3.5] `AccessControlException` should be thrown 
even if `ignoreCorruptFiles` is enabled
    
    Cherry-pick https://github.com/apache/spark/issues/49143 to branch-3.5
    
    ### What changes were proposed in this pull request?
    
    `AccessControlException` extends `IOException` but we should not treat it 
as a data corruption issue.
    
    This is similar to SPARK-50483 which handles `BlockMissingException` in the 
same way.
    
    ```
    2024-12-11 06:29:05 WARN HadoopRDD: Skipped the rest content in the 
corrupted file: 
hdfs://hadoop-master1.orb.local:8020/warehouse/region/part-00000-2dc8a6f6-8cea-4652-8ba1-762c1b65e2b4-c000:192+192
    org.apache.hadoop.security.AccessControlException: Permission denied: 
user=hive, access=READ, 
inode="/warehouse/region/part-00000-2dc8a6f6-8cea-4652-8ba1-762c1b65e2b4-c000":kyuubi.hadoop:hadoop:-rw-------
            at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:506)
    ```
    <img width="1462" alt="image" 
src="https://github.com/user-attachments/assets/d3a64578-90c6-49bb-b92f-7c5c71451a9b";>
    
    ### Why are the changes needed?
    
    Avoid data issue if `ignoreCorruptFiles` is enabled when 
`AccessControlException` occurred.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Manual test.
    
    Task fails with `org.apache.hadoop.security.AccessControlException` even 
with `spark.sql.files.ignoreCorruptFiles=true` and 
`spark.files.ignoreCorruptFiles=true` <img width="1477" alt="image" 
src="https://github.com/user-attachments/assets/373ad5fc-15f5-486f-aba3-53b7f7af3b13";>
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49162 from pan3793/SPARK-50545-3.5.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala             | 5 +++--
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala          | 5 +++--
 .../org/apache/spark/sql/execution/datasources/FileScanRDD.scala     | 3 ++-
 .../spark/sql/execution/datasources/v2/FilePartitionReader.scala     | 5 +++--
 4 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index edd07a2649db..8aa7d54fd61b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapred.lib.CombineFileSplit
 import org.apache.hadoop.mapreduce.TaskType
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.security.AccessControlException
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark._
@@ -294,7 +295,7 @@ class HadoopRDD[K, V](
             null
           // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
           case e: FileNotFoundException if !ignoreMissingFiles => throw e
-          case e: BlockMissingException => throw e
+          case e @ (_ : AccessControlException | _ : BlockMissingException) => 
throw e
           case e: IOException if ignoreCorruptFiles =>
             logWarning(s"Skipped the rest content in the corrupted file: 
${split.inputSplit}", e)
             finished = true
@@ -320,7 +321,7 @@ class HadoopRDD[K, V](
             finished = true
           // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
           case e: FileNotFoundException if !ignoreMissingFiles => throw e
-          case e: BlockMissingException => throw e
+          case e @ (_ : AccessControlException | _ : BlockMissingException) => 
throw e
           case e: IOException if ignoreCorruptFiles =>
             logWarning(s"Skipped the rest content in the corrupted file: 
${split.inputSplit}", e)
             finished = true
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index fbd2235aabaf..7fc93806998b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, 
FileInputFormat, FileSplit, InvalidInputException}
 import org.apache.hadoop.mapreduce.task.{JobContextImpl, 
TaskAttemptContextImpl}
+import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
@@ -228,7 +229,7 @@ class NewHadoopRDD[K, V](
             null
           // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
           case e: FileNotFoundException if !ignoreMissingFiles => throw e
-          case e: BlockMissingException => throw e
+          case e @ (_ : AccessControlException | _ : BlockMissingException) => 
throw e
           case e: IOException if ignoreCorruptFiles =>
             logWarning(
               s"Skipped the rest content in the corrupted file: 
${split.serializableHadoopSplit}",
@@ -257,7 +258,7 @@ class NewHadoopRDD[K, V](
               finished = true
             // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
             case e: FileNotFoundException if !ignoreMissingFiles => throw e
-            case e: BlockMissingException => throw e
+            case e @ (_ : AccessControlException | _ : BlockMissingException) 
=> throw e
             case e: IOException if ignoreCorruptFiles =>
               logWarning(
                 s"Skipped the rest content in the corrupted file: 
${split.serializableHadoopSplit}",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index ce56fc1b2829..8f6f981ec6a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hdfs.BlockMissingException
+import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, 
TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -260,7 +261,7 @@ class FileScanRDD(
                     null
                   // Throw FileNotFoundException even if `ignoreCorruptFiles` 
is true
                   case e: FileNotFoundException if !ignoreMissingFiles => 
throw e
-                  case e: BlockMissingException => throw e
+                  case e @ (_ : AccessControlException | _ : 
BlockMissingException) => throw e
                   case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
                     logWarning(
                       s"Skipped the rest of the content in the corrupted file: 
$currentFile", e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 8f51226dcfe9..6a63d8268c3b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -21,6 +21,7 @@ import java.io.{FileNotFoundException, IOException}
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.hdfs.BlockMissingException
+import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark.SparkUpgradeException
 import org.apache.spark.internal.Logging
@@ -51,7 +52,7 @@ class FilePartitionReader[T](
           // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
           case e: FileNotFoundException if !ignoreMissingFiles =>
             throw QueryExecutionErrors.fileNotFoundError(e)
-          case e: BlockMissingException => throw e
+          case e @ (_ : AccessControlException | _ : BlockMissingException) => 
throw e
           case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
             logWarning(
               s"Skipped the rest of the content in the corrupted file.", e)
@@ -71,7 +72,7 @@ class FilePartitionReader[T](
         throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
           currentReader.file.urlEncodedPath,
           e.getColumn, e.getLogicalType, e.getPhysicalType, e)
-      case e: BlockMissingException => throw e
+      case e @ (_ : AccessControlException | _ : BlockMissingException) => 
throw e
       case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
         logWarning(
           s"Skipped the rest of the content in the corrupted file: 
$currentReader", e)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to