This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cf3ef31bdfb0 [SPARK-52287][CORE] Improve `SparkContext` not to
populate `o.a.s.internal.io.cloud.*`-related setting if not exist
cf3ef31bdfb0 is described below
commit cf3ef31bdfb0e9e32ddee96178ab68bcdf8d798c
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat May 24 13:07:51 2025 +0800
[SPARK-52287][CORE] Improve `SparkContext` not to populate
`o.a.s.internal.io.cloud.*`-related setting if not exist
### What changes were proposed in this pull request?
This is an improvement to prevent Spark from throwing confusing exceptions
to the users.
Technically, this is a regression of Apache Spark 4.0.0 from 3.5.5.
**Apache Spark 3.5.5**
```
$ bin/spark-shell -c
"spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled=true"
scala> spark.range(1).count
res0: Long = 1
```
**Apache Spark 4.0.0**
```
$ bin/spark-shell -c
"spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled=true"
scala> spark.range(1).count
...
Caused by: java.lang.IllegalArgumentException:
'org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter'
in spark.sql.parquet.output.committer.class is invalid.
Class must be loadable and subclass of
org.apache.hadoop.mapreduce.OutputCommitter
...
```
**After this PR**
```
$ bin/spark-shell -c
"spark.hadoop.fs.s3a.bucket.*.committer.magic.enabled=true"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT
/_/
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.15)
...
scala> spark.range(1).count()
val res0: Long = 1
```
### Why are the changes needed?
Since Apache Spark 3.2.0, Apache Spark helps users by allowing a single
configuration
`spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled=true` to use S3
magic committer via populating the required missing `S3A magic committer`
setting automatically. For example, the following.
- #32518
```
spark.hadoop.fs.s3a.committer.magic.enabled=true
spark.hadoop.fs.s3a.committer.name=magic
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
```
However, it has an assumption that the users built their distribution with
`-Phadoop-cloud` already. Some distributions like Apache Spark binary
distribution are not built with `-Phadoop-cloud`. So, they do not have
`org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter` and
`org.apache.spark.internal.io.cloud.PathOutputCommitProtocol` classes.
### Does this PR introduce _any_ user-facing change?
- This is a regression fix for Apache Spark 4.0.0 from 3.5.5.
- It only happens when a user tries to use `S3A` magic committer on a Spark
distribution built without `-Phadoop-cloud`.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51005 from dongjoon-hyun/SPARK-52287.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../main/scala/org/apache/spark/SparkContext.scala | 4 +-
.../scala/org/apache/spark/SparkContextSuite.scala | 20 ++---
.../scala/org/apache/spark/SparkContextSuite.scala | 91 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a82fd5264337..78f4863adc41 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -3397,7 +3397,9 @@ object SparkContext extends Logging {
.getAllWithPrefix("spark.hadoop.fs.s3a.bucket.")
.filter(_._1.endsWith(".committer.magic.enabled"))
.filter(_._2.equalsIgnoreCase("true"))
- if (magicCommitterConfs.nonEmpty) {
+ if (magicCommitterConfs.nonEmpty &&
+
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
&&
+
Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"))
{
// Try to enable S3 magic committer if missing
conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
if
(conf.get("spark.hadoop.fs.s3a.committer.magic.enabled").equals("true")) {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 6473f823406c..9e5859feefb5 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1275,6 +1275,13 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
}
test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
+ Seq(
+ "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
+ "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+ ).foreach { className =>
+ assert(!Utils.classIsLoadable(className))
+ }
+
val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
sc = new SparkContext(c1)
assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
@@ -1287,18 +1294,7 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
resetSparkContext()
val c3 =
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled",
"true")
sc = new SparkContext(c3)
- Seq(
- "spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
- "spark.hadoop.fs.s3a.committer.name" -> "magic",
- "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
- "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
- "spark.sql.parquet.output.committer.class" ->
- "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
- "spark.sql.sources.commitProtocolClass" ->
- "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
- ).foreach { case (k, v) =>
- assert(v == sc.getConf.get(k))
- }
+ assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
// Respect a user configuration
resetSparkContext()
diff --git
a/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
new file mode 100644
index 000000000000..584793b2b575
--- /dev/null
+++ b/hadoop-cloud/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.util.Utils
+
+class SparkContextSuite extends SparkFunSuite with BeforeAndAfterEach {
+ @transient var sc: SparkContext = _
+
+ override def afterEach(): Unit = {
+ try {
+ if (sc != null) {
+ sc.stop()
+ }
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
+ Seq(
+ "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
+ "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+ ).foreach { className =>
+ assert(Utils.classIsLoadable(className))
+ }
+
+ val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
+ sc = new SparkContext(c1)
+ assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
+ sc.stop()
+
+ val c2 =
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled",
"false")
+ sc = new SparkContext(c2)
+ assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))
+ sc.stop()
+
+ val c3 =
c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled",
"true")
+ sc = new SparkContext(c3)
+ Seq(
+ "spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
+ "spark.hadoop.fs.s3a.committer.name" -> "magic",
+ "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
+ "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
+ "spark.sql.parquet.output.committer.class" ->
+ "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
+ "spark.sql.sources.commitProtocolClass" ->
+ "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
+ ).foreach { case (k, v) =>
+ assert(v == sc.getConf.get(k))
+ }
+ sc.stop()
+
+ // Respect a user configuration
+ val c4 = c1.clone
+ .set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
+ .set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled",
"true")
+ sc = new SparkContext(c4)
+ Seq(
+ "spark.hadoop.fs.s3a.committer.magic.enabled" -> "false",
+ "spark.hadoop.fs.s3a.committer.name" -> null,
+ "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> null,
+ "spark.sql.parquet.output.committer.class" -> null,
+ "spark.sql.sources.commitProtocolClass" -> null
+ ).foreach { case (k, v) =>
+ if (v == null) {
+ assert(!sc.getConf.contains(k))
+ } else {
+ assert(v == sc.getConf.get(k))
+ }
+ }
+ sc.stop()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]