This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c9ca90c5e [CELEBORN-1965] Rely on all default hadoop providers for S3 
auth
c9ca90c5e is described below

commit c9ca90c5ee0d27e4c046e01b7a3e4cbfec062353
Author: Nicolas Fraison <[email protected]>
AuthorDate: Fri May 9 14:16:47 2025 +0800

    [CELEBORN-1965] Rely on all default hadoop providers for S3 auth
    
    ### What changes were proposed in this pull request?
    
    Support all [default hadoop 
provider](https://github.com/apache/hadoop/blob/rel/release-3.3.6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java#L563)
 for S3 authentication
    
    ### Why are the changes needed?
    
    As of now celeborn only support authentication based on ACESS/SECRET key 
while other authentication mechanism can be required (for ex. ENV var, relying 
on 
[AWS_CONTAINER_CREDENTIALS_RELATIVE_URI](https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html))
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, the `celeborn.storage.s3.secret.key` and 
`celeborn.storage.s3.access.key` are removed. In order to still provide those 
we should rely on the hadoop config (`celeborn.hadoop.fs.s3a.access.key` / 
`celeborn.hadoop.fs.s3a.secret.key `)
    
    ### How was this patch tested?
    
    Tested on celeborn cluster deployed on kubernetes and configured to use S3 
relying on `IAMInstanceCredentialsProvider`
    
    Closes #3243 from ashangit/nfraison/CELEBORN-1965.
    
    Lead-authored-by: Nicolas Fraison <[email protected]>
    Co-authored-by: [email protected] 
<[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 20 ----------
 .../celeborn/common/util/CelebornHadoopUtils.scala | 15 +++++---
 docs/configuration/client.md                       |  2 -
 docs/configuration/master.md                       |  2 -
 docs/configuration/worker.md                       |  2 -
 .../apache/celeborn/S3MultipartUploadHandler.java  | 43 ++++++++++++----------
 .../deploy/worker/storage/TierWriterHelper.java    | 15 +++-----
 .../service/deploy/worker/storage/TierWriter.scala |  9 +----
 8 files changed, 38 insertions(+), 70 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index f2cc17613..6455e4051 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1182,10 +1182,6 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE)
   def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE)
 
-  def s3AccessKey: String = get(S3_ACCESS_KEY).getOrElse("")
-
-  def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
-
   def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
 
   def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
@@ -3165,22 +3161,6 @@ object CelebornConf extends Logging {
       .stringConf
       .createOptional
 
-  val S3_SECRET_KEY: OptionalConfigEntry[String] =
-    buildConf("celeborn.storage.s3.secret.key")
-      .categories("worker", "master", "client")
-      .version("0.6.0")
-      .doc("S3 secret key for Celeborn to store shuffle data.")
-      .stringConf
-      .createOptional
-
-  val S3_ACCESS_KEY: OptionalConfigEntry[String] =
-    buildConf("celeborn.storage.s3.access.key")
-      .categories("worker", "master", "client")
-      .version("0.6.0")
-      .doc("S3 access key for Celeborn to store shuffle data.")
-      .stringConf
-      .createOptional
-
   val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
     buildConf("celeborn.storage.s3.endpoint.region")
       .categories("worker", "master", "client")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 3cdb57b97..047012ef6 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -18,6 +18,8 @@
 package org.apache.celeborn.common.util
 
 import java.io.{File, IOException}
+import java.util
+import java.util.HashSet
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.hadoop.conf.Configuration
@@ -51,17 +53,18 @@ object CelebornHadoopUtils extends Logging {
     }
 
     if (conf.s3Dir.nonEmpty) {
-      if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || 
conf.s3EndpointRegion.isEmpty) {
-        throw new CelebornException(
-          "S3 storage is enabled but s3AccessKey, s3SecretKey, or 
s3EndpointRegion is not set")
+      if (conf.s3EndpointRegion.isEmpty) {
+        throw new CelebornException("S3 storage is enabled but 
s3EndpointRegion is not set")
       }
       hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
       hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
       hadoopConf.set(
         "fs.s3a.aws.credentials.provider",
-        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
-      hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
-      hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
+        "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider," +
+          "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider," +
+          "com.amazonaws.auth.EnvironmentVariableCredentialsProvider," +
+          "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider")
+
       hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
     } else if (conf.ossDir.nonEmpty) {
       if (conf.ossAccessKey.isEmpty || conf.ossSecretKey.isEmpty || 
conf.ossEndpoint.isEmpty) {
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index acf5a59a4..6c0ff752d 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -139,9 +139,7 @@ license: |
 | celeborn.storage.oss.endpoint | &lt;undefined&gt; | false | OSS endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss 
credentials, disable this config to support jindo sdk . | 0.6.0 |  | 
 | celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.tags.tagsExpr |  | true | Expression to filter workers by tags. The 
expression is a comma-separated list of tags. The expression is evaluated as a 
logical AND of all tags. For example, `prod,high-io` filters workers that have 
both the `prod` and `high-io` tags. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index b7e363f15..f17cdcc00 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -96,10 +96,8 @@ license: |
 | celeborn.storage.oss.endpoint | &lt;undefined&gt; | false | OSS endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss 
credentials, disable this config to support jindo sdk . | 0.6.0 |  | 
 | celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.tags.enabled | true | false | Whether to enable tags for workers. | 
0.6.0 |  | 
 | celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the 
tags expression provided by the client over the tags expression provided by the 
master. | 0.6.0 |  | 
 | celeborn.tags.tagsExpr |  | true | Expression to filter workers by tags. The 
expression is a comma-separated list of tags. The expression is evaluated as a 
logical AND of all tags. For example, `prod,high-io` filters workers that have 
both the `prod` and `high-io` tags. | 0.6.0 |  | 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 99739d7d9..b601a59a2 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -51,11 +51,9 @@ license: |
 | celeborn.storage.oss.endpoint | &lt;undefined&gt; | false | OSS endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss 
credentials, disable this config to support jindo sdk . | 0.6.0 |  | 
 | celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. 
| 0.6.0 |  | 
-| celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.worker.activeConnection.max | &lt;undefined&gt; | false | If the 
number of active connections on a worker exceeds this configuration value, the 
worker will be marked as high-load in the heartbeat report, and the master will 
not include that node in the response of RequestSlots. | 0.3.1 |  | 
 | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size 
of the application registry on Workers. | 0.5.0 |  | 
 | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads 
count for read buffer per mount point. | 0.3.0 |  | 
diff --git 
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
 
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index 993199fb4..f22217a2c 100644
--- 
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++ 
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -19,12 +19,13 @@ package org.apache.celeborn;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
 import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.event.ProgressListener;
 import com.amazonaws.retry.PredefinedRetryPolicies;
 import com.amazonaws.services.s3.AmazonS3;
@@ -39,6 +40,13 @@ import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PartListing;
 import com.amazonaws.services.s3.model.PartSummary;
 import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
+import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
+import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,27 +64,22 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
 
   private String bucketName;
 
-  private String s3AccessKey;
-
-  private String s3SecretKey;
-
-  private String s3EndpointRegion;
-
   private Integer s3MultiplePartUploadMaxRetries;
 
   public S3MultipartUploadHandler(
-      String bucketName,
-      String s3AccessKey,
-      String s3SecretKey,
-      String s3EndpointRegion,
-      String key,
-      Integer s3MultiplePartUploadMaxRetries) {
+      FileSystem hadoopFs, String bucketName, String key, Integer 
s3MultiplePartUploadMaxRetries)
+      throws IOException, URISyntaxException {
     this.bucketName = bucketName;
-    this.s3AccessKey = s3AccessKey;
-    this.s3SecretKey = s3SecretKey;
-    this.s3EndpointRegion = s3EndpointRegion;
     this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
-    BasicAWSCredentials basicAWSCredentials = new 
BasicAWSCredentials(s3AccessKey, s3SecretKey);
+
+    Configuration conf = hadoopFs.getConf();
+    AWSCredentialProviderList providers = new AWSCredentialProviderList();
+    providers.add(new TemporaryAWSCredentialsProvider(conf));
+    providers.add(
+        new SimpleAWSCredentialsProvider(new URI(String.format("s3a://%s", 
bucketName)), conf));
+    providers.add(new EnvironmentVariableCredentialsProvider());
+    providers.add(new IAMInstanceCredentialsProvider());
+
     ClientConfiguration clientConfig =
         new ClientConfiguration()
             .withRetryPolicy(
@@ -85,8 +88,8 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
             .withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
     this.s3Client =
         AmazonS3ClientBuilder.standard()
-            .withCredentials(new 
AWSStaticCredentialsProvider(basicAWSCredentials))
-            .withRegion(s3EndpointRegion)
+            .withCredentials(providers)
+            .withRegion(conf.get(Constants.AWS_REGION))
             .withClientConfiguration(clientConfig)
             .build();
     this.key = key;
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
index 3524beb78..62088e5d7 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
@@ -17,29 +17,24 @@
 
 package org.apache.celeborn.service.deploy.worker.storage;
 
+import org.apache.hadoop.fs.FileSystem;
+
 import org.apache.celeborn.reflect.DynConstructors;
 import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
 
 public class TierWriterHelper {
   public static MultipartUploadHandler getS3MultipartUploadHandler(
-      String bucketName,
-      String s3AccessKey,
-      String s3SecretKey,
-      String s3EndpointRegion,
-      String key,
-      int maxRetryies) {
+      FileSystem hadoopFs, String bucketName, String key, int maxRetryies) {
     return (MultipartUploadHandler)
         DynConstructors.builder()
             .impl(
                 "org.apache.celeborn.S3MultipartUploadHandler",
-                String.class,
-                String.class,
-                String.class,
+                FileSystem.class,
                 String.class,
                 String.class,
                 Integer.class)
             .build()
-            .newInstance(bucketName, s3AccessKey, s3SecretKey, 
s3EndpointRegion, key, maxRetryies);
+            .newInstance(hadoopFs, bucketName, key, maxRetryies);
   }
 
   public static MultipartUploadHandler getOssMultipartUploadHandler(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index edfbff675..ecc3372e1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -530,21 +530,14 @@ class DfsTierWriter(
   try {
     hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
     if (hdfsFileInfo.isS3) {
-      val configuration = hadoopFs.getConf
-      val s3AccessKey = configuration.get("fs.s3a.access.key")
-      val s3SecretKey = configuration.get("fs.s3a.secret.key")
-      val s3EndpointRegion = configuration.get("fs.s3a.endpoint.region")
-
       val uri = hadoopFs.getUri
       val bucketName = uri.getHost
       val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
       val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 
1)
 
       this.s3MultipartUploadHandler = 
TierWriterHelper.getS3MultipartUploadHandler(
+        hadoopFs,
         bucketName,
-        s3AccessKey,
-        s3SecretKey,
-        s3EndpointRegion,
         key,
         conf.s3MultiplePartUploadMaxRetries)
       s3MultipartUploadHandler.startUpload()

Reply via email to