This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cf3ef31bdfb0 [SPARK-52287][CORE] Improve `SparkContext` not to 
populate `o.a.s.internal.io.cloud.*`-related setting if not exist
cf3ef31bdfb0 is described below

commit cf3ef31bdfb0e9e32ddee96178ab68bcdf8d798c
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat May 24 13:07:51 2025 +0800

    [SPARK-52287][CORE] Improve `SparkContext` not to populate 
`o.a.s.internal.io.cloud.*`-related setting if not exist
    
    ### What changes were proposed in this pull request?
    
    This is an improvement to prevent Spark from throwing confusing exceptions 
to the users.
    
    Technically, this is a regression of Apache Spark 4.0.0 from 3.5.5.
    
    **Apache Spark 3.5.5**
    ```
    $ bin/spark-shell -c 
"spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled=true"
    scala> spark.range(1).count
    res0: Long = 1
    ```
    
    **Apache Spark 4.0.0**
    ```
    $ bin/spark-shell -c 
"spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled=true"
    scala> spark.range(1).count
    ...
    Caused by: java.lang.IllegalArgumentException:
    'org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter'
    in spark.sql.parquet.output.committer.class is invalid.
    Class must be loadable and subclass of 
org.apache.hadoop.mapreduce.OutputCommitter
    ...
    ```
    
    **After this PR**
    ```
    $ bin/spark-shell -c 
"spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled=true"
    ...
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
          /_/
    
    Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.15)
    ...
    scala> spark.range(1).count()
    val res0: Long = 1
    ```
    
    ### Why are the changes needed?
    
    Since Apache Spark 3.2.0, Apache Spark helps users by allowing a single 
configuration 
`spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled=true` to use S3 
magic committer via populating the required missing `S3A magic committer` 
setting automatically. For example, the following.
    - #32518
    
    ```
    spark.hadoop.fs.s3a.committer.magic.enabled=true
    spark.hadoop.fs.s3a.committer.name=magic
    
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
    
spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
    
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
    ```
    
    However, it has an assumption that the users built their distribution with 
`-Phadoop-cloud` already. Some distributions like Apache Spark binary 
distribution are not built with `-Phadoop-cloud`. So, they do not have 
`org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter` and 
`org.apache.spark.internal.io.cloud.PathOutputCommitProtocol` classes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    - This is a regression fix for Apache Spark 4.0.0 from 3.5.5.
    - It only happens when a user tries to use `S3A` magic committer on a Spark 
distribution built without `-Phadoop-cloud`.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51005 from dongjoon-hyun/SPARK-52287.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  4 +-
 .../scala/org/apache/spark/SparkContextSuite.scala | 20 ++---
 .../scala/org/apache/spark/SparkContextSuite.scala | 91 ++++++++++++++++++++++
 3 files changed, 102 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a82fd5264337..78f4863adc41 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -3397,7 +3397,9 @@ object SparkContext extends Logging {
       .getAllWithPrefix("spark.hadoop.fs.s3a.bucket.")
       .filter(_._1.endsWith(".committer.magic.enabled"))
       .filter(_._2.equalsIgnoreCase("true"))
-    if (magicCommitterConfs.nonEmpty) {
+    if (magicCommitterConfs.nonEmpty &&
+        
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
 &&
+        
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"))
 {
       // Try to enable S3 magic committer if missing
       conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
       if 
(conf.get("spark.hadoop.fs.s3a.committer.magic.enabled").equals("true")) {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 6473f823406c..9e5859feefb5 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1275,6 +1275,13 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   }
 
   test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
+    Seq(
+      "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
+      "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+    ).foreach { className =>
+      assert(!Utils.classIsLoadable(className))
+    }
+
     val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
     sc = new SparkContext(c1)
     assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
@@ -1287,18 +1294,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     resetSparkContext()
     val c3 = 
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"true")
     sc = new SparkContext(c3)
-    Seq(
-      "spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
-      "spark.hadoop.fs.s3a.committer.name" -> "magic",
-      "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
-        "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
-      "spark.sql.parquet.output.committer.class" ->
-        "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
-      "spark.sql.sources.commitProtocolClass" ->
-        "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
-    ).foreach { case (k, v) =>
-      assert(v == sc.getConf.get(k))
-    }
+    assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
 
     // Respect a user configuration
     resetSparkContext()
diff --git 
a/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
new file mode 100644
index 000000000000..584793b2b575
--- /dev/null
+++ b/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.util.Utils
+
+class SparkContextSuite extends SparkFunSuite with BeforeAndAfterEach {
+  @transient var sc: SparkContext = _
+
+  override def afterEach(): Unit = {
+    try {
+      if (sc != null) {
+        sc.stop()
+      }
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
+    Seq(
+      "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
+      "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+    ).foreach { className =>
+      assert(Utils.classIsLoadable(className))
+    }
+
+    val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
+    sc = new SparkContext(c1)
+    assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
+    sc.stop()
+
+    val c2 = 
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"false")
+    sc = new SparkContext(c2)
+    assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
+    sc.stop()
+
+    val c3 = 
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"true")
+    sc = new SparkContext(c3)
+    Seq(
+      "spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
+      "spark.hadoop.fs.s3a.committer.name" -> "magic",
+      "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
+        "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
+      "spark.sql.parquet.output.committer.class" ->
+        "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
+      "spark.sql.sources.commitProtocolClass" ->
+        "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+    ).foreach { case (k, v) =>
+      assert(v == sc.getConf.get(k))
+    }
+    sc.stop()
+
+    // Respect a user configuration
+    val c4 = c1.clone
+      .set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
+      .set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"true")
+    sc = new SparkContext(c4)
+    Seq(
+      "spark.hadoop.fs.s3a.committer.magic.enabled" -> "false",
+      "spark.hadoop.fs.s3a.committer.name" -> null,
+      "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> null,
+      "spark.sql.parquet.output.committer.class" -> null,
+      "spark.sql.sources.commitProtocolClass" -> null
+    ).foreach { case (k, v) =>
+      if (v == null) {
+        assert(!sc.getConf.contains(k))
+      } else {
+        assert(v == sc.getConf.get(k))
+      }
+    }
+    sc.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to