This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c9ca90c5e [CELEBORN-1965] Rely on all default hadoop providers for S3
auth
c9ca90c5e is described below
commit c9ca90c5ee0d27e4c046e01b7a3e4cbfec062353
Author: Nicolas Fraison <[email protected]>
AuthorDate: Fri May 9 14:16:47 2025 +0800
[CELEBORN-1965] Rely on all default hadoop providers for S3 auth
### What changes were proposed in this pull request?
Support all [default hadoop
provider](https://github.com/apache/hadoop/blob/rel/release-3.3.6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java#L563)
for S3 authentication
### Why are the changes needed?
As of now celeborn only support authentication based on ACESS/SECRET key
while other authentication mechanism can be required (for ex. ENV var, relying
on
[AWS_CONTAINER_CREDENTIALS_RELATIVE_URI](https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html))
### Does this PR introduce _any_ user-facing change?
yes, the `celeborn.storage.s3.secret.key` and
`celeborn.storage.s3.access.key` are removed. In order to still provide those
we should rely on the hadoop config (`celeborn.hadoop.fs.s3a.access.key` /
`celeborn.hadoop.fs.s3a.secret.key `)
### How was this patch tested?
Tested on celeborn cluster deployed on kubernetes and configured to use S3
relying on `IAMInstanceCredentialsProvider`
Closes #3243 from ashangit/nfraison/CELEBORN-1965.
Lead-authored-by: Nicolas Fraison <[email protected]>
Co-authored-by: [email protected]
<[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 20 ----------
.../celeborn/common/util/CelebornHadoopUtils.scala | 15 +++++---
docs/configuration/client.md | 2 -
docs/configuration/master.md | 2 -
docs/configuration/worker.md | 2 -
.../apache/celeborn/S3MultipartUploadHandler.java | 43 ++++++++++++----------
.../deploy/worker/storage/TierWriterHelper.java | 15 +++-----
.../service/deploy/worker/storage/TierWriter.scala | 9 +----
8 files changed, 38 insertions(+), 70 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index f2cc17613..6455e4051 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1182,10 +1182,6 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE)
def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE)
- def s3AccessKey: String = get(S3_ACCESS_KEY).getOrElse("")
-
- def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
-
def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
@@ -3165,22 +3161,6 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
- val S3_SECRET_KEY: OptionalConfigEntry[String] =
- buildConf("celeborn.storage.s3.secret.key")
- .categories("worker", "master", "client")
- .version("0.6.0")
- .doc("S3 secret key for Celeborn to store shuffle data.")
- .stringConf
- .createOptional
-
- val S3_ACCESS_KEY: OptionalConfigEntry[String] =
- buildConf("celeborn.storage.s3.access.key")
- .categories("worker", "master", "client")
- .version("0.6.0")
- .doc("S3 access key for Celeborn to store shuffle data.")
- .stringConf
- .createOptional
-
val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
buildConf("celeborn.storage.s3.endpoint.region")
.categories("worker", "master", "client")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 3cdb57b97..047012ef6 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -18,6 +18,8 @@
package org.apache.celeborn.common.util
import java.io.{File, IOException}
+import java.util
+import java.util.HashSet
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.hadoop.conf.Configuration
@@ -51,17 +53,18 @@ object CelebornHadoopUtils extends Logging {
}
if (conf.s3Dir.nonEmpty) {
- if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty ||
conf.s3EndpointRegion.isEmpty) {
- throw new CelebornException(
- "S3 storage is enabled but s3AccessKey, s3SecretKey, or
s3EndpointRegion is not set")
+ if (conf.s3EndpointRegion.isEmpty) {
+ throw new CelebornException("S3 storage is enabled but
s3EndpointRegion is not set")
}
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set(
"fs.s3a.aws.credentials.provider",
- "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
- hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
- hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
+ "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider," +
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider," +
+ "com.amazonaws.auth.EnvironmentVariableCredentialsProvider," +
+ "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider")
+
hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
} else if (conf.ossDir.nonEmpty) {
if (conf.ossAccessKey.isEmpty || conf.ossSecretKey.isEmpty ||
conf.ossEndpoint.isEmpty) {
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index acf5a59a4..6c0ff752d 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -139,9 +139,7 @@ license: |
| celeborn.storage.oss.endpoint | <undefined> | false | OSS endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss
credentials, disable this config to support jindo sdk . | 0.6.0 | |
| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The
expression is a comma-separated list of tags. The expression is evaluated as a
logical AND of all tags. For example, `prod,high-io` filters workers that have
both the `prod` and `high-io` tags. | 0.6.0 | |
<!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index b7e363f15..f17cdcc00 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -96,10 +96,8 @@ license: |
| celeborn.storage.oss.endpoint | <undefined> | false | OSS endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss
credentials, disable this config to support jindo sdk . | 0.6.0 | |
| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.tags.enabled | true | false | Whether to enable tags for workers. |
0.6.0 | |
| celeborn.tags.preferClientTagsExpr | false | true | When `true`, prefer the
tags expression provided by the client over the tags expression provided by the
master. | 0.6.0 | |
| celeborn.tags.tagsExpr | | true | Expression to filter workers by tags. The
expression is a comma-separated list of tags. The expression is evaluated as a
logical AND of all tags. For example, `prod,high-io` filters workers that have
both the `prod` and `high-io` tags. | 0.6.0 | |
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 99739d7d9..b601a59a2 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -51,11 +51,9 @@ license: |
| celeborn.storage.oss.endpoint | <undefined> | false | OSS endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss
credentials, disable this config to support jindo sdk . | 0.6.0 | |
| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries.
| 0.6.0 | |
-| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.worker.activeConnection.max | <undefined> | false | If the
number of active connections on a worker exceeds this configuration value, the
worker will be marked as high-load in the heartbeat report, and the master will
not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size
of the application registry on Workers. | 0.5.0 | |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads
count for read buffer per mount point. | 0.3.0 | |
diff --git
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index 993199fb4..f22217a2c 100644
---
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -19,12 +19,13 @@ package org.apache.celeborn;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSStaticCredentialsProvider;
-import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
@@ -39,6 +40,13 @@ import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PartListing;
import com.amazonaws.services.s3.model.PartSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
+import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
+import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,27 +64,22 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
private String bucketName;
- private String s3AccessKey;
-
- private String s3SecretKey;
-
- private String s3EndpointRegion;
-
private Integer s3MultiplePartUploadMaxRetries;
public S3MultipartUploadHandler(
- String bucketName,
- String s3AccessKey,
- String s3SecretKey,
- String s3EndpointRegion,
- String key,
- Integer s3MultiplePartUploadMaxRetries) {
+ FileSystem hadoopFs, String bucketName, String key, Integer
s3MultiplePartUploadMaxRetries)
+ throws IOException, URISyntaxException {
this.bucketName = bucketName;
- this.s3AccessKey = s3AccessKey;
- this.s3SecretKey = s3SecretKey;
- this.s3EndpointRegion = s3EndpointRegion;
this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
- BasicAWSCredentials basicAWSCredentials = new
BasicAWSCredentials(s3AccessKey, s3SecretKey);
+
+ Configuration conf = hadoopFs.getConf();
+ AWSCredentialProviderList providers = new AWSCredentialProviderList();
+ providers.add(new TemporaryAWSCredentialsProvider(conf));
+ providers.add(
+ new SimpleAWSCredentialsProvider(new URI(String.format("s3a://%s",
bucketName)), conf));
+ providers.add(new EnvironmentVariableCredentialsProvider());
+ providers.add(new IAMInstanceCredentialsProvider());
+
ClientConfiguration clientConfig =
new ClientConfiguration()
.withRetryPolicy(
@@ -85,8 +88,8 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
.withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
this.s3Client =
AmazonS3ClientBuilder.standard()
- .withCredentials(new
AWSStaticCredentialsProvider(basicAWSCredentials))
- .withRegion(s3EndpointRegion)
+ .withCredentials(providers)
+ .withRegion(conf.get(Constants.AWS_REGION))
.withClientConfiguration(clientConfig)
.build();
this.key = key;
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
index 3524beb78..62088e5d7 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
@@ -17,29 +17,24 @@
package org.apache.celeborn.service.deploy.worker.storage;
+import org.apache.hadoop.fs.FileSystem;
+
import org.apache.celeborn.reflect.DynConstructors;
import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
public class TierWriterHelper {
public static MultipartUploadHandler getS3MultipartUploadHandler(
- String bucketName,
- String s3AccessKey,
- String s3SecretKey,
- String s3EndpointRegion,
- String key,
- int maxRetryies) {
+ FileSystem hadoopFs, String bucketName, String key, int maxRetryies) {
return (MultipartUploadHandler)
DynConstructors.builder()
.impl(
"org.apache.celeborn.S3MultipartUploadHandler",
- String.class,
- String.class,
- String.class,
+ FileSystem.class,
String.class,
String.class,
Integer.class)
.build()
- .newInstance(bucketName, s3AccessKey, s3SecretKey,
s3EndpointRegion, key, maxRetryies);
+ .newInstance(hadoopFs, bucketName, key, maxRetryies);
}
public static MultipartUploadHandler getOssMultipartUploadHandler(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index edfbff675..ecc3372e1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -530,21 +530,14 @@ class DfsTierWriter(
try {
hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
if (hdfsFileInfo.isS3) {
- val configuration = hadoopFs.getConf
- val s3AccessKey = configuration.get("fs.s3a.access.key")
- val s3SecretKey = configuration.get("fs.s3a.secret.key")
- val s3EndpointRegion = configuration.get("fs.s3a.endpoint.region")
-
val uri = hadoopFs.getUri
val bucketName = uri.getHost
val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length +
1)
this.s3MultipartUploadHandler =
TierWriterHelper.getS3MultipartUploadHandler(
+ hadoopFs,
bucketName,
- s3AccessKey,
- s3SecretKey,
- s3EndpointRegion,
key,
conf.s3MultiplePartUploadMaxRetries)
s3MultipartUploadHandler.startUpload()