This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8149d9bf6c45 [SPARK-47618][CORE] Use Magic Committer for all S3 
buckets by default
8149d9bf6c45 is described below

commit 8149d9bf6c45a5f6d27dbeae98d87b5d2493e109
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun May 25 10:12:25 2025 -0700

    [SPARK-47618][CORE] Use Magic Committer for all S3 buckets by default
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use Apache Hadoop `Magic Committer` for all S3 buckets by 
default in Apache Spark 4.1.0.
    
    ### Why are the changes needed?
    
    Apache Hadoop `Magic Committer` has been used for S3 buckets to get the 
best performance since [S3 became fully consistent on December 1st, 
2020](https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/).
    - 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel
    > Amazon S3 provides strong read-after-write consistency for PUT and DELETE 
requests of objects in your Amazon S3 bucket in all AWS Regions. This behavior 
applies to both writes to new objects as well as PUT requests that overwrite 
existing objects and DELETE requests. In addition, read operations on Amazon S3 
Select, Amazon S3 access controls lists (ACLs), Amazon S3 Object Tags, and 
object metadata (for example, the HEAD object) are strongly consistent.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the migration guide is updated.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51010 from dongjoon-hyun/SPARK-47618-2.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/main/scala/org/apache/spark/SparkContext.scala | 14 ++++----------
 docs/core-migration-guide.md                           |  1 +
 .../scala/org/apache/spark/SparkContextSuite.scala     | 18 +++---------------
 3 files changed, 8 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 78f4863adc41..2e61773251b6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -428,7 +428,7 @@ class SparkContext(config: SparkConf) extends Logging {
     conf.setIfMissing("spark.hadoop.fs.s3a.vectored.read.min.seek.size", 
"128K")
     conf.setIfMissing("spark.hadoop.fs.s3a.vectored.read.max.merged.size", 
"2M")
     // This should be set as early as possible.
-    SparkContext.fillMissingMagicCommitterConfsIfNeeded(_conf)
+    SparkContext.enableMagicCommitterIfNeeded(_conf)
 
     SparkContext.supplementJavaModuleOptions(_conf)
     SparkContext.supplementJavaIPv6Options(_conf)
@@ -3389,16 +3389,10 @@ object SparkContext extends Logging {
   }
 
   /**
-   * This is a helper function to complete the missing S3A magic committer 
configurations
-   * based on a single conf: 
`spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled`
+   * Enable Magic Committer by default for all S3 buckets if hadoop-cloud 
module exists.
    */
-  private def fillMissingMagicCommitterConfsIfNeeded(conf: SparkConf): Unit = {
-    val magicCommitterConfs = conf
-      .getAllWithPrefix("spark.hadoop.fs.s3a.bucket.")
-      .filter(_._1.endsWith(".committer.magic.enabled"))
-      .filter(_._2.equalsIgnoreCase("true"))
-    if (magicCommitterConfs.nonEmpty &&
-        
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
 &&
+  private def enableMagicCommitterIfNeeded(conf: SparkConf): Unit = {
+    if 
(Utils.classIsLoadable("org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
 &&
         
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"))
 {
       // Try to enable S3 magic committer if missing
       conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index a560a6da91a4..7d51801edc67 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -26,6 +26,7 @@ license: |
 
 - Since Spark 4.1, Spark Master deamon provides REST API by default. To 
restore the behavior before Spark 4.1, you can set `spark.master.rest.enabled` 
to `false`.
 - Since Spark 4.1, Spark will compress RDD checkpoints by default. To restore 
the behavior before Spark 4.1, you can set `spark.checkpoint.compress` to 
`false`.
+- Since Spark 4.1, Spark uses Apache Hadoop Magic Committer for all S3 buckets 
by default. To restore the behavior before Spark 4.0, you can set 
`spark.hadoop.fs.s3a.committer.magic.enabled=false`.
 
 ## Upgrading from Core 3.5 to 4.0
 
diff --git 
a/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 584793b2b575..00b7adcd76c3 100644
--- a/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -34,7 +34,7 @@ class SparkContextSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     }
   }
 
-  test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
+  test("SPARK-47618: Use Magic Committer for all S3 buckets by default") {
     Seq(
       "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
       "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
@@ -44,16 +44,6 @@ class SparkContextSuite extends SparkFunSuite with 
BeforeAndAfterEach {
 
     val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
     sc = new SparkContext(c1)
-    assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
-    sc.stop()
-
-    val c2 = 
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"false")
-    sc = new SparkContext(c2)
-    assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
-    sc.stop()
-
-    val c3 = 
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"true")
-    sc = new SparkContext(c3)
     Seq(
       "spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
       "spark.hadoop.fs.s3a.committer.name" -> "magic",
@@ -69,10 +59,8 @@ class SparkContextSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     sc.stop()
 
     // Respect a user configuration
-    val c4 = c1.clone
-      .set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
-      .set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", 
"true")
-    sc = new SparkContext(c4)
+    val c2 = c1.clone.set("spark.hadoop.fs.s3a.committer.magic.enabled", 
"false")
+    sc = new SparkContext(c2)
     Seq(
       "spark.hadoop.fs.s3a.committer.magic.enabled" -> "false",
       "spark.hadoop.fs.s3a.committer.name" -> null,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to