This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92886f2e975 [FLINK-39166][s3] Add bucket-level configuration support 
to Native S3 FileSystem
92886f2e975 is described below

commit 92886f2e975fe28709c94d141fcff936a50ba079
Author: Samrat <[email protected]>
AuthorDate: Sat May 16 13:38:08 2026 +0530

    [FLINK-39166][s3] Add bucket-level configuration support to Native S3 
FileSystem
---
 flink-filesystems/flink-s3-fs-native/README.md     |  25 ++
 .../flink/fs/s3native/BucketConfigProvider.java    | 202 ++++++++++++
 .../fs/s3native/NativeS3FileSystemFactory.java     |  77 ++++-
 .../apache/flink/fs/s3native/S3BucketConfig.java   | 342 +++++++++++++++++++++
 .../apache/flink/fs/s3native/S3ClientProvider.java |  63 +++-
 .../fs/s3native/BucketConfigProviderTest.java      | 259 ++++++++++++++++
 .../fs/s3native/NativeS3FileSystemFactoryTest.java | 188 ++++++++++-
 .../flink/fs/s3native/S3BucketConfigTest.java      | 170 ++++++++++
 8 files changed, 1299 insertions(+), 27 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-native/README.md 
b/flink-filesystems/flink-s3-fs-native/README.md
index 6dcf2c74ff0..df9f58823fb 100644
--- a/flink-filesystems/flink-s3-fs-native/README.md
+++ b/flink-filesystems/flink-s3-fs-native/README.md
@@ -90,6 +90,31 @@ input.sinkTo(FileSink.forRowFormat(new 
Path("s3://my-bucket/output"),
 | s3.assume-role.session-name | flink-s3-session | Session name for the 
assumed role |
 | s3.assume-role.session-duration | 3600 | Session duration in seconds 
(900-43200) |
 
+## Bucket-Level Configuration
+
+The Native S3 FileSystem supports per-bucket configuration overrides, allowing 
different S3 buckets to use different connection settings within the same Flink 
cluster. This enables scenarios like:
+
+- **Different credentials per bucket** (e.g., cross-account access for a data 
sink bucket)
+- **Different regions or endpoints** (e.g., checkpoints in `us-east-1`, 
archive bucket in `eu-west-1`)
+- **Bucket-specific encryption** (e.g., SSE-KMS for sensitive data, SSE-S3 for 
logs)
+
+### Format
+
+Bucket-level configuration uses the format: 
`s3.bucket.<bucket-name>.<property>`
+
+Bucket names containing dots (e.g., `my.company.data`) are fully supported 
through longest-suffix matching.
+
+> **Note:** AWS recommends avoiding periods (`.`) in bucket names. Buckets 
with dots cannot use virtual-hosted-style addressing over HTTPS without custom 
certificate validation. If you use dotted bucket names, enable 
`path-style-access` for that bucket (see [AWS 
documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html)).
+
+### Supported Properties
+
+All global S3 configuration properties can be overridden at the bucket level:
+
+- **Connection:** `region`, `endpoint`, `path-style-access`
+- **Credentials:** `access-key`, `secret-key`, `aws.credentials.provider`
+- **Encryption:** `sse.type`, `sse.kms.key-id`
+- **IAM Assume Role:** `assume-role.arn`, `assume-role.external-id`, 
`assume-role.session-name`, `assume-role.session-duration`
+
 ## Server-Side Encryption (SSE)
 
 The filesystem supports server-side encryption for data at rest:
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java
new file mode 100644
index 00000000000..05355f8d4a0
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+    static final Map<String, BiConsumer<S3BucketConfig.Builder, String>> 
PROPERTY_APPLICATORS;
+    static final List<String> KNOWN_PROPERTIES_BY_LENGTH;
+
+    static {
+        final Map<String, BiConsumer<S3BucketConfig.Builder, String>> 
applicators =
+                new LinkedHashMap<>();
+        applicators.put("access-key", S3BucketConfig.Builder::accessKey);
+        applicators.put("assume-role.arn", 
S3BucketConfig.Builder::assumeRoleArn);
+        applicators.put("assume-role.external-id", 
S3BucketConfig.Builder::assumeRoleExternalId);
+        applicators.put(
+                "assume-role.session-duration",
+                (b, v) -> {
+                    try {
+                        
b.assumeRoleSessionDurationSeconds(Integer.parseInt(v));
+                    } catch (NumberFormatException e) {
+                        throw new IllegalConfigurationException(
+                                String.format(
+                                        "Invalid assume-role.session-duration 
'%s' for bucket '%s'. "
+                                                + "Must be a valid integer 
(e.g., 3600)",
+                                        v, b.getBucketName()),
+                                e);
+                    }
+                });
+        applicators.put("assume-role.session-name", 
S3BucketConfig.Builder::assumeRoleSessionName);
+        applicators.put("aws.credentials.provider", 
S3BucketConfig.Builder::credentialsProvider);
+        applicators.put("endpoint", S3BucketConfig.Builder::endpoint);
+        applicators.put(
+                "path-style-access",
+                (b, v) -> {
+                    if (!"true".equalsIgnoreCase(v) && 
!"false".equalsIgnoreCase(v)) {
+                        throw new IllegalConfigurationException(
+                                String.format(
+                                        "Invalid path-style-access '%s' for 
bucket '%s'. "
+                                                + "Must be 'true' or 'false'",
+                                        v, b.getBucketName()));
+                    }
+                    b.pathStyleAccess(Boolean.parseBoolean(v));
+                });
+        applicators.put("region", S3BucketConfig.Builder::region);
+        applicators.put("secret-key", S3BucketConfig.Builder::secretKey);
+        applicators.put("sse.kms.key-id", S3BucketConfig.Builder::sseKmsKeyId);
+        applicators.put("sse.type", S3BucketConfig.Builder::sseType);
+        PROPERTY_APPLICATORS = Collections.unmodifiableMap(applicators);
+
+        KNOWN_PROPERTIES_BY_LENGTH =
+                applicators.keySet().stream()
+                        
.sorted(Comparator.comparingInt(String::length).reversed())
+                        .collect(Collectors.toList());
+    }
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        final Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (final String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            final String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            if (StringUtils.isNullOrWhitespaceOnly(suffix)) {
+                continue;
+            }
+            final String value = flinkConfig.getString(key, null);
+            if (StringUtils.isNullOrWhitespaceOnly(value)) {
+                continue;
+            }
+
+            boolean matched = false;
+            for (final String prop : KNOWN_PROPERTIES_BY_LENGTH) {
+                if (suffix.endsWith("." + prop)) {
+                    final String bucketName =
+                            suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
+                        LOG.warn(
+                                "Ignoring bucket config key '{}': "
+                                        + "resolved bucket name is empty 
(missing bucket name between "
+                                        + "'s3.bucket.' prefix and '.{}' 
property?).",
+                                key,
+                                prop);
+                    } else {
+                        rawConfigs
+                                .computeIfAbsent(bucketName, k -> new 
HashMap<>())
+                                .put(prop, value);
+                    }
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                LOG.warn(
+                        "Ignoring unrecognized bucket config key '{}'. "
+                                + "Known bucket-level properties: {}",
+                        key,
+                        PROPERTY_APPLICATORS.keySet());
+            }
+        }
+
+        final Map<String, S3BucketConfig> result = new HashMap<>();
+        for (final Map.Entry<String, Map<String, String>> entry : 
rawConfigs.entrySet()) {
+            final String bucketName = entry.getKey();
+            final Map<String, String> props = entry.getValue();
+
+            final S3BucketConfig bucketConfig = buildBucketConfig(bucketName, 
props);
+            if (bucketConfig.hasAnyOverride()) {
+                result.put(bucketName, bucketConfig);
+                LOG.info(
+                        "Registered bucket-specific configuration for bucket 
'{}': {}",
+                        bucketName,
+                        bucketConfig);
+            }
+        }
+
+        return result;
+    }
+
+    private static S3BucketConfig buildBucketConfig(String bucketName, 
Map<String, String> props) {
+        final S3BucketConfig.Builder builder = 
S3BucketConfig.builder(bucketName);
+
+        for (final Map.Entry<String, BiConsumer<S3BucketConfig.Builder, 
String>> entry :
+                PROPERTY_APPLICATORS.entrySet()) {
+            final String value = props.get(entry.getKey());
+            if (value != null) {
+                entry.getValue().accept(builder, value);
+            }
+        }
+
+        return builder.build();
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index c54ed86f8bf..50c0145be93 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,11 +41,14 @@ import java.util.Collections;
 import java.util.Map;
 
 /**
- * Factory for creating Native S3 FileSystem instances.
+ * Factory for creating Native S3 FileSystem instances using AWS SDK v2.
  *
- * <p>This factory creates {@link NativeS3FileSystem} instances for accessing 
Amazon S3 buckets
- * using AWS SDK v2. The Native S3 FileSystem provides a drop-in replacement 
for Presto and Hadoop
- * S3 implementations with minimal external dependencies.
+ * <h3>Bucket-Level Configuration</h3>
+ *
+ * <p>Supports per-bucket configuration overrides using the format {@code
+ * s3.bucket.<bucket-name>.<property>}. Bucket-level settings override global 
settings; unset
+ * properties inherit global values. See {@link 
BucketConfigProvider#PROPERTY_APPLICATORS} for the
+ * complete list of supported bucket-level properties.
  *
  * @see NativeS3FileSystem
  * @see org.apache.flink.core.fs.FileSystemFactory
@@ -295,6 +299,7 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "static credentials (if configured) -> 
DefaultCredentialsProvider.");
 
     @Nullable private Configuration flinkConfig;
+    @Nullable private BucketConfigProvider bucketConfigProvider;
 
     @Override
     public String getScheme() {
@@ -310,6 +315,7 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
     @Override
     public void configure(Configuration config) {
         this.flinkConfig = config;
+        this.bucketConfigProvider = new BucketConfigProvider(config);
     }
 
     @Override
@@ -324,13 +330,56 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
         String region = config.get(REGION);
         String endpoint = config.get(ENDPOINT);
         boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
+        String sseType = config.get(SSE_TYPE);
+        String sseKmsKeyId = config.get(SSE_KMS_KEY_ID);
+        String assumeRoleArn = config.get(ASSUME_ROLE_ARN);
+        String assumeRoleExternalId = config.get(ASSUME_ROLE_EXTERNAL_ID);
+        String assumeRoleSessionName = config.get(ASSUME_ROLE_SESSION_NAME);
+        int assumeRoleSessionDuration = 
config.get(ASSUME_ROLE_SESSION_DURATION_SECONDS);
+        String credentialsProviderClasses = 
config.get(AWS_CREDENTIALS_PROVIDER);
+
+        // Apply bucket-specific overrides
+        String bucketName = fsUri.getHost();
+        if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
+            throw new IOException("Invalid S3 URI: missing or empty bucket 
name in URI: " + fsUri);
+        }
+        if (bucketConfigProvider != null) {
+            S3BucketConfig overrides = 
bucketConfigProvider.getBucketConfig(bucketName);
+            if (overrides != null) {
+                LOG.debug(
+                        "Applying bucket-specific configuration for bucket 
'{}': {}",
+                        bucketName,
+                        overrides);
+                accessKey = firstNonNull(overrides.getAccessKey(), accessKey);
+                secretKey = firstNonNull(overrides.getSecretKey(), secretKey);
+                region = firstNonNull(overrides.getRegion(), region);
+                endpoint = firstNonNull(overrides.getEndpoint(), endpoint);
+                sseType = firstNonNull(overrides.getSseType(), sseType);
+                sseKmsKeyId = firstNonNull(overrides.getSseKmsKeyId(), 
sseKmsKeyId);
+                assumeRoleArn = firstNonNull(overrides.getAssumeRoleArn(), 
assumeRoleArn);
+                assumeRoleExternalId =
+                        firstNonNull(overrides.getAssumeRoleExternalId(), 
assumeRoleExternalId);
+                assumeRoleSessionName =
+                        firstNonNull(overrides.getAssumeRoleSessionName(), 
assumeRoleSessionName);
+                credentialsProviderClasses =
+                        firstNonNull(
+                                overrides.getCredentialsProvider(), 
credentialsProviderClasses);
+                if (overrides.getPathStyleAccess() != null) {
+                    pathStyleAccess = overrides.getPathStyleAccess();
+                }
+                if (overrides.getAssumeRoleSessionDurationSeconds() != null) {
+                    assumeRoleSessionDuration = 
overrides.getAssumeRoleSessionDurationSeconds();
+                }
+            }
+        }
 
         S3EncryptionConfig encryptionConfig =
                 S3EncryptionConfig.fromConfig(
-                        config.get(SSE_TYPE),
-                        config.get(SSE_KMS_KEY_ID),
+                        sseType,
+                        sseKmsKeyId,
                         config.getOptional(SSE_KMS_ENCRYPTION_CONTEXT)
                                 .orElse(Collections.emptyMap()));
+
         String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
         int numEntropyChars = -1;
         if (entropyInjectionKey != null) {
@@ -404,13 +453,12 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                         .socketTimeout(config.get(SOCKET_TIMEOUT))
                         
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
                         .clientCloseTimeout(config.get(CLIENT_CLOSE_TIMEOUT))
-                        .assumeRoleArn(config.get(ASSUME_ROLE_ARN))
-                        
.assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID))
-                        
.assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME))
-                        .assumeRoleSessionDurationSeconds(
-                                
config.get(ASSUME_ROLE_SESSION_DURATION_SECONDS))
+                        .assumeRoleArn(assumeRoleArn)
+                        .assumeRoleExternalId(assumeRoleExternalId)
+                        .assumeRoleSessionName(assumeRoleSessionName)
+                        
.assumeRoleSessionDurationSeconds(assumeRoleSessionDuration)
                         .maxRetries(config.get(MAX_RETRIES))
-                        
.credentialsProviderClasses(config.get(AWS_CREDENTIALS_PROVIDER))
+                        .credentialsProviderClasses(credentialsProviderClasses)
                         .encryptionConfig(encryptionConfig)
                         .build();
 
@@ -442,4 +490,9 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                 readBufferSize,
                 config.get(FS_CLOSE_TIMEOUT));
     }
+
+    @Nullable
+    private static <T> T firstNonNull(@Nullable T override, @Nullable T base) {
+        return override != null ? override : base;
+    }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java
new file mode 100644
index 00000000000..0625254cd33
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Immutable, bucket-specific S3 configuration overrides.
+ *
+ * <p>Null values indicate inheritance from global configuration. Only 
explicitly configured values
+ * are non-null. Configuration format: {@code 
s3.bucket.<bucket-name>.<property>}
+ *
+ * <p>Validates that credentials (access-key/secret-key) are either both set 
or both absent.
+ */
+@Internal
+final class S3BucketConfig {
+
+    private final String bucketName;
+    @Nullable private final String region;
+    @Nullable private final String endpoint;
+    @Nullable private final Boolean pathStyleAccess;
+    @Nullable private final String accessKey;
+    @Nullable private final String secretKey;
+    @Nullable private final String sseType;
+    @Nullable private final String sseKmsKeyId;
+    @Nullable private final String assumeRoleArn;
+    @Nullable private final String assumeRoleExternalId;
+    @Nullable private final String assumeRoleSessionName;
+    @Nullable private final Integer assumeRoleSessionDurationSeconds;
+    @Nullable private final String credentialsProvider;
+
+    private S3BucketConfig(Builder builder) {
+        this.bucketName = builder.bucketName;
+        this.region = builder.region;
+        this.endpoint = builder.endpoint;
+        this.pathStyleAccess = builder.pathStyleAccess;
+        this.accessKey = builder.accessKey;
+        this.secretKey = builder.secretKey;
+        this.sseType = builder.sseType;
+        this.sseKmsKeyId = builder.sseKmsKeyId;
+        this.assumeRoleArn = builder.assumeRoleArn;
+        this.assumeRoleExternalId = builder.assumeRoleExternalId;
+        this.assumeRoleSessionName = builder.assumeRoleSessionName;
+        this.assumeRoleSessionDurationSeconds = 
builder.assumeRoleSessionDurationSeconds;
+        this.credentialsProvider = builder.credentialsProvider;
+    }
+
+    String getBucketName() {
+        return bucketName;
+    }
+
+    @Nullable
+    String getRegion() {
+        return region;
+    }
+
+    @Nullable
+    String getEndpoint() {
+        return endpoint;
+    }
+
+    @Nullable
+    Boolean getPathStyleAccess() {
+        return pathStyleAccess;
+    }
+
+    @Nullable
+    String getAccessKey() {
+        return accessKey;
+    }
+
+    @Nullable
+    String getSecretKey() {
+        return secretKey;
+    }
+
+    @Nullable
+    String getSseType() {
+        return sseType;
+    }
+
+    @Nullable
+    String getSseKmsKeyId() {
+        return sseKmsKeyId;
+    }
+
+    @Nullable
+    String getAssumeRoleArn() {
+        return assumeRoleArn;
+    }
+
+    @Nullable
+    String getAssumeRoleExternalId() {
+        return assumeRoleExternalId;
+    }
+
+    @Nullable
+    String getAssumeRoleSessionName() {
+        return assumeRoleSessionName;
+    }
+
+    @Nullable
+    Integer getAssumeRoleSessionDurationSeconds() {
+        return assumeRoleSessionDurationSeconds;
+    }
+
+    @Nullable
+    String getCredentialsProvider() {
+        return credentialsProvider;
+    }
+
+    boolean hasAnyOverride() {
+        return region != null
+                || endpoint != null
+                || pathStyleAccess != null
+                || accessKey != null
+                || secretKey != null
+                || sseType != null
+                || sseKmsKeyId != null
+                || assumeRoleArn != null
+                || assumeRoleExternalId != null
+                || assumeRoleSessionName != null
+                || assumeRoleSessionDurationSeconds != null
+                || credentialsProvider != null;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        S3BucketConfig that = (S3BucketConfig) o;
+        return Objects.equals(bucketName, that.bucketName)
+                && Objects.equals(region, that.region)
+                && Objects.equals(endpoint, that.endpoint)
+                && Objects.equals(pathStyleAccess, that.pathStyleAccess)
+                && Objects.equals(accessKey, that.accessKey)
+                && Objects.equals(secretKey, that.secretKey)
+                && Objects.equals(sseType, that.sseType)
+                && Objects.equals(sseKmsKeyId, that.sseKmsKeyId)
+                && Objects.equals(assumeRoleArn, that.assumeRoleArn)
+                && Objects.equals(assumeRoleExternalId, 
that.assumeRoleExternalId)
+                && Objects.equals(assumeRoleSessionName, 
that.assumeRoleSessionName)
+                && Objects.equals(
+                        assumeRoleSessionDurationSeconds, 
that.assumeRoleSessionDurationSeconds)
+                && Objects.equals(credentialsProvider, 
that.credentialsProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bucketName,
+                region,
+                endpoint,
+                pathStyleAccess,
+                accessKey,
+                secretKey,
+                sseType,
+                sseKmsKeyId,
+                assumeRoleArn,
+                assumeRoleExternalId,
+                assumeRoleSessionName,
+                assumeRoleSessionDurationSeconds,
+                credentialsProvider);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("S3BucketConfig{");
+        sb.append("bucket='").append(bucketName).append("'");
+        if (region != null) {
+            sb.append(", region='").append(region).append("'");
+        }
+        if (endpoint != null) {
+            sb.append(", endpoint='").append(endpoint).append("'");
+        }
+        if (pathStyleAccess != null) {
+            sb.append(", pathStyleAccess=").append(pathStyleAccess);
+        }
+        if (accessKey != null) {
+            sb.append(", 
credentials=").append(GlobalConfiguration.HIDDEN_CONTENT);
+        }
+        if (sseType != null) {
+            sb.append(", sseType='").append(sseType).append("'");
+        }
+        if (sseKmsKeyId != null) {
+            sb.append(", 
sseKmsKeyId=").append(GlobalConfiguration.HIDDEN_CONTENT);
+        }
+        if (assumeRoleArn != null) {
+            sb.append(", assumeRoleArn='").append(assumeRoleArn).append("'");
+        }
+        if (assumeRoleExternalId != null) {
+            sb.append(", 
assumeRoleExternalId='").append(assumeRoleExternalId).append("'");
+        }
+        if (assumeRoleSessionName != null) {
+            sb.append(", 
assumeRoleSessionName='").append(assumeRoleSessionName).append("'");
+        }
+        if (assumeRoleSessionDurationSeconds != null) {
+            sb.append(", assumeRoleSessionDurationSeconds=")
+                    .append(assumeRoleSessionDurationSeconds);
+        }
+        if (credentialsProvider != null) {
+            sb.append(", 
credentialsProvider='").append(credentialsProvider).append("'");
+        }
+        sb.append('}');
+        return sb.toString();
+    }
+
+    static Builder builder(String bucketName) {
+        if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
+            throw new IllegalArgumentException("Bucket name must not be null 
or empty");
+        }
+        return new Builder(bucketName);
+    }
+
+    static final class Builder {
+        private final String bucketName;
+        @Nullable private String region;
+        @Nullable private String endpoint;
+        @Nullable private Boolean pathStyleAccess;
+        @Nullable private String accessKey;
+        @Nullable private String secretKey;
+        @Nullable private String sseType;
+        @Nullable private String sseKmsKeyId;
+        @Nullable private String assumeRoleArn;
+        @Nullable private String assumeRoleExternalId;
+        @Nullable private String assumeRoleSessionName;
+        @Nullable private Integer assumeRoleSessionDurationSeconds;
+        @Nullable private String credentialsProvider;
+
+        private Builder(String bucketName) {
+            this.bucketName = bucketName;
+        }
+
+        String getBucketName() {
+            return bucketName;
+        }
+
+        Builder region(String region) {
+            this.region = region;
+            return this;
+        }
+
+        Builder endpoint(String endpoint) {
+            this.endpoint = endpoint;
+            return this;
+        }
+
+        Builder pathStyleAccess(boolean pathStyleAccess) {
+            this.pathStyleAccess = pathStyleAccess;
+            return this;
+        }
+
+        Builder accessKey(String accessKey) {
+            this.accessKey = accessKey;
+            return this;
+        }
+
+        Builder secretKey(String secretKey) {
+            this.secretKey = secretKey;
+            return this;
+        }
+
+        Builder sseType(String sseType) {
+            this.sseType = sseType;
+            return this;
+        }
+
+        Builder sseKmsKeyId(String sseKmsKeyId) {
+            this.sseKmsKeyId = sseKmsKeyId;
+            return this;
+        }
+
+        Builder assumeRoleArn(String assumeRoleArn) {
+            this.assumeRoleArn = assumeRoleArn;
+            return this;
+        }
+
+        Builder assumeRoleExternalId(String assumeRoleExternalId) {
+            this.assumeRoleExternalId = assumeRoleExternalId;
+            return this;
+        }
+
+        Builder assumeRoleSessionName(String assumeRoleSessionName) {
+            this.assumeRoleSessionName = assumeRoleSessionName;
+            return this;
+        }
+
+        Builder assumeRoleSessionDurationSeconds(int 
assumeRoleSessionDurationSeconds) {
+            this.assumeRoleSessionDurationSeconds = 
assumeRoleSessionDurationSeconds;
+            return this;
+        }
+
+        Builder credentialsProvider(String credentialsProvider) {
+            this.credentialsProvider = credentialsProvider;
+            return this;
+        }
+
+        S3BucketConfig build() {
+            boolean hasAccessKey = 
!StringUtils.isNullOrWhitespaceOnly(accessKey);
+            boolean hasSecretKey = 
!StringUtils.isNullOrWhitespaceOnly(secretKey);
+            if (hasAccessKey != hasSecretKey) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "Bucket '%s': both 's3.bucket.%s.access-key' 
and "
+                                        + "'s3.bucket.%s.secret-key' must be 
set together. "
+                                        + "Found only %s.",
+                                bucketName,
+                                bucketName,
+                                bucketName,
+                                hasAccessKey ? "access-key" : "secret-key"));
+            }
+            return new S3BucketConfig(this);
+        }
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index 8f327ff54cf..cd1c918c486 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -88,6 +88,12 @@ class S3ClientProvider implements AutoCloseableAsync {
     private final boolean checksumValidation;
     private final int maxConnections;
     private final int maxRetries;
+    @Nullable private final String region;
+    @Nullable private final String endpoint;
+    @Nullable private final String assumeRoleArn;
+    @Nullable private final String assumeRoleExternalId;
+    @Nullable private final String assumeRoleSessionName;
+    private final int assumeRoleSessionDurationSeconds;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private S3ClientProvider(
@@ -104,7 +110,13 @@ class S3ClientProvider implements AutoCloseableAsync {
             boolean chunkedEncoding,
             boolean checksumValidation,
             int maxConnections,
-            int maxRetries) {
+            int maxRetries,
+            @Nullable String region,
+            @Nullable String endpoint,
+            @Nullable String assumeRoleArn,
+            @Nullable String assumeRoleExternalId,
+            @Nullable String assumeRoleSessionName,
+            int assumeRoleSessionDurationSeconds) {
         this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client must 
not be null");
         this.transferManager =
                 Preconditions.checkNotNull(transferManager, "transferManager 
must not be null");
@@ -129,6 +141,12 @@ class S3ClientProvider implements AutoCloseableAsync {
         this.checksumValidation = checksumValidation;
         this.maxConnections = maxConnections;
         this.maxRetries = maxRetries;
+        this.region = region;
+        this.endpoint = endpoint;
+        this.assumeRoleArn = assumeRoleArn;
+        this.assumeRoleExternalId = assumeRoleExternalId;
+        this.assumeRoleSessionName = assumeRoleSessionName;
+        this.assumeRoleSessionDurationSeconds = 
assumeRoleSessionDurationSeconds;
     }
 
     public S3Client getS3Client() {
@@ -196,6 +214,41 @@ class S3ClientProvider implements AutoCloseableAsync {
         return maxRetries;
     }
 
+    @VisibleForTesting
+    @Nullable
+    String getRegion() {
+        return region;
+    }
+
+    @VisibleForTesting
+    @Nullable
+    String getEndpoint() {
+        return endpoint;
+    }
+
+    @VisibleForTesting
+    @Nullable
+    String getAssumeRoleArn() {
+        return assumeRoleArn;
+    }
+
+    @VisibleForTesting
+    @Nullable
+    String getAssumeRoleExternalId() {
+        return assumeRoleExternalId;
+    }
+
+    @VisibleForTesting
+    @Nullable
+    String getAssumeRoleSessionName() {
+        return assumeRoleSessionName;
+    }
+
+    @VisibleForTesting
+    int getAssumeRoleSessionDurationSeconds() {
+        return assumeRoleSessionDurationSeconds;
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (!closed.compareAndSet(false, true)) {
@@ -462,7 +515,13 @@ class S3ClientProvider implements AutoCloseableAsync {
                     chunkedEncoding,
                     checksumValidation,
                     maxConnections,
-                    maxRetries);
+                    maxRetries,
+                    region,
+                    endpoint,
+                    assumeRoleArn,
+                    assumeRoleExternalId,
+                    assumeRoleSessionName,
+                    assumeRoleSessionDurationSeconds);
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java
new file mode 100644
index 00000000000..97921ff3175
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BucketConfigProvider}. */
+class BucketConfigProviderTest {
+
+    /** One test exercises all 11 known properties on a single bucket. */
+    @Test
+    void testParsesAllKnownPropertiesForSingleBucket() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.region", "us-west-2");
+        config.setString("s3.bucket.my-bucket.endpoint", 
"https://s3.us-west-2.amazonaws.com";);
+        config.setString("s3.bucket.my-bucket.path-style-access", "true");
+        config.setString("s3.bucket.my-bucket.access-key", 
"AKIAIOSFODNN7EXAMPLE");
+        config.setString(
+                "s3.bucket.my-bucket.secret-key", 
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+        config.setString("s3.bucket.my-bucket.sse.type", "sse-kms");
+        config.setString(
+                "s3.bucket.my-bucket.sse.kms.key-id",
+                "arn:aws:kms:us-east-1:123456789:key/12345678");
+        config.setString(
+                "s3.bucket.my-bucket.assume-role.arn",
+                "arn:aws:iam::123456789012:role/S3AccessRole");
+        config.setString("s3.bucket.my-bucket.assume-role.external-id", 
"ext-id-abc");
+        config.setString("s3.bucket.my-bucket.assume-role.session-name", 
"flink-job");
+        config.setString("s3.bucket.my-bucket.assume-role.session-duration", 
"7200");
+        config.setString(
+                "s3.bucket.my-bucket.aws.credentials.provider",
+                
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        assertThat(provider.size()).isEqualTo(1);
+        S3BucketConfig bucket = provider.getBucketConfig("my-bucket");
+        assertThat(bucket).isNotNull();
+        assertThat(bucket.getRegion()).isEqualTo("us-west-2");
+        
assertThat(bucket.getEndpoint()).isEqualTo("https://s3.us-west-2.amazonaws.com";);
+        assertThat(bucket.getPathStyleAccess()).isTrue();
+        assertThat(bucket.getAccessKey()).isEqualTo("AKIAIOSFODNN7EXAMPLE");
+        
assertThat(bucket.getSecretKey()).isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+        assertThat(bucket.getSseType()).isEqualTo("sse-kms");
+        assertThat(bucket.getSseKmsKeyId())
+                .isEqualTo("arn:aws:kms:us-east-1:123456789:key/12345678");
+        assertThat(bucket.getAssumeRoleArn())
+                .isEqualTo("arn:aws:iam::123456789012:role/S3AccessRole");
+        assertThat(bucket.getAssumeRoleExternalId()).isEqualTo("ext-id-abc");
+        assertThat(bucket.getAssumeRoleSessionName()).isEqualTo("flink-job");
+        
assertThat(bucket.getAssumeRoleSessionDurationSeconds()).isEqualTo(7200);
+        assertThat(bucket.getCredentialsProvider())
+                
.isEqualTo("software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+    }
+
+    @Test
+    void testParsesMultipleBuckets() {
+        Configuration config = new Configuration();
+        config.setString(
+                "s3.bucket.checkpoint-bucket.endpoint", 
"https://s3.us-east-1.amazonaws.com";);
+        config.setString("s3.bucket.checkpoint-bucket.region", "us-east-1");
+        config.setString(
+                "s3.bucket.savepoint-bucket.endpoint", 
"https://s3.eu-west-1.amazonaws.com";);
+        config.setString("s3.bucket.savepoint-bucket.region", "eu-west-1");
+        config.setString("s3.bucket.savepoint-bucket.path-style-access", 
"false");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        assertThat(provider.size()).isEqualTo(2);
+
+        S3BucketConfig cpConfig = 
provider.getBucketConfig("checkpoint-bucket");
+        assertThat(cpConfig).isNotNull();
+        
assertThat(cpConfig.getEndpoint()).isEqualTo("https://s3.us-east-1.amazonaws.com";);
+        assertThat(cpConfig.getRegion()).isEqualTo("us-east-1");
+        assertThat(cpConfig.getPathStyleAccess()).isNull();
+
+        S3BucketConfig spConfig = provider.getBucketConfig("savepoint-bucket");
+        assertThat(spConfig).isNotNull();
+        
assertThat(spConfig.getEndpoint()).isEqualTo("https://s3.eu-west-1.amazonaws.com";);
+        assertThat(spConfig.getRegion()).isEqualTo("eu-west-1");
+        assertThat(spConfig.getPathStyleAccess()).isFalse();
+    }
+
+    /** Bucket names containing dots are fully supported via longest-suffix 
matching. */
+    @Test
+    void testDottedBucketName() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my.company.data.endpoint", 
"https://s3-custom.example.com";);
+        config.setString("s3.bucket.my.company.data.region", "ap-southeast-1");
+        config.setString("s3.bucket.my.company.data.sse.type", "sse-s3");
+        config.setString("s3.bucket.my.company.data.sse.kms.key-id", 
"key-123");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        assertThat(provider.hasBucketConfig("my.company.data")).isTrue();
+        S3BucketConfig bucket = provider.getBucketConfig("my.company.data");
+        
assertThat(bucket.getEndpoint()).isEqualTo("https://s3-custom.example.com";);
+        assertThat(bucket.getRegion()).isEqualTo("ap-southeast-1");
+        assertThat(bucket.getSseType()).isEqualTo("sse-s3");
+        assertThat(bucket.getSseKmsKeyId()).isEqualTo("key-123");
+    }
+
+    @Test
+    void testNonBucketConfigKeysIgnored() {
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "GLOBAL_KEY");
+        config.setString("s3.secret-key", "GLOBAL_SECRET");
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.bucket.my-bucket.region", "eu-west-1");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        assertThat(provider.size()).isEqualTo(1);
+        assertThat(provider.hasBucketConfig("my-bucket")).isTrue();
+    }
+
+    /** A key whose bucket segment is empty (e.g. {@code s3.bucket..region}) 
must be ignored. */
+    @Test
+    void testEmptyBucketSegmentInKeyIsIgnored() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket..region", "us-east-1");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        assertThat(provider.size()).isEqualTo(0);
+    }
+
+    /** A bucket whose keys only match unknown properties produces no 
registered bucket config. */
+    @Test
+    void testBucketWithOnlyUnknownPropertiesProducesNoConfig() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.unknown-property", "some-value");
+        config.setString("s3.bucket.my-bucket.another-unknown", "other-value");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        assertThat(provider.size()).isEqualTo(0);
+        assertThat(provider.getBucketConfig("my-bucket")).isNull();
+    }
+
+    @Test
+    void testUnknownPropertyMixedWithKnownIsIgnored() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.unknown-property", "some-value");
+        config.setString("s3.bucket.my-bucket.region", "us-east-1");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        S3BucketConfig bucket = provider.getBucketConfig("my-bucket");
+        assertThat(bucket).isNotNull();
+        assertThat(bucket.getRegion()).isEqualTo("us-east-1");
+    }
+
+    @Test
+    void testNoBucketConfigReturnsNull() {
+        BucketConfigProvider provider = new BucketConfigProvider(new 
Configuration());
+
+        assertThat(provider.getBucketConfig("non-existent-bucket")).isNull();
+        assertThat(provider.size()).isEqualTo(0);
+    }
+
+    @Test
+    void testEmptyConfigurationProducesNoEntries() {
+        assertThat(new BucketConfigProvider(new 
Configuration()).size()).isEqualTo(0);
+    }
+
+    @Test
+    void testPartialCredentialsRejected() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.bad-bucket.access-key", 
"AKIAIOSFODNN7EXAMPLE");
+
+        assertThatThrownBy(() -> new BucketConfigProvider(config))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("must be set together");
+    }
+
+    @Test
+    void testInvalidSessionDurationThrowsException() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.assume-role.session-duration", 
"not-a-number");
+        config.setString("s3.bucket.my-bucket.region", "us-east-1");
+
+        assertThatThrownBy(() -> new BucketConfigProvider(config))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("Invalid assume-role.session-duration");
+    }
+
+    @Test
+    void testInvalidPathStyleAccessThrowsException() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.path-style-access", "treu");
+        config.setString("s3.bucket.my-bucket.region", "us-east-1");
+
+        assertThatThrownBy(() -> new BucketConfigProvider(config))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("Invalid path-style-access");
+    }
+
+    @Test
+    void testUnrecognizedBucketPropertyIsIgnoredWithoutThrow() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.typo-region", "us-east-1");
+        config.setString("s3.bucket.my-bucket.region", "eu-west-1");
+
+        BucketConfigProvider provider = new BucketConfigProvider(config);
+
+        S3BucketConfig bucket = provider.getBucketConfig("my-bucket");
+        assertThat(bucket).isNotNull();
+        assertThat(bucket.getRegion()).isEqualTo("eu-west-1");
+    }
+
+    @Test
+    void testPropertyApplicatorsCoverAllKnownProperties() {
+        assertThat(BucketConfigProvider.PROPERTY_APPLICATORS.size())
+                .as("PROPERTY_APPLICATORS must have an entry for every known 
property")
+                
.isEqualTo(BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.size());
+
+        assertThat(BucketConfigProvider.PROPERTY_APPLICATORS.keySet())
+                .containsExactlyInAnyOrderElementsOf(
+                        BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH);
+    }
+
+    @Test
+    void testKnownPropertiesSortedByDescendingLength() {
+        for (int i = 1; i < 
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.size(); i++) {
+            
assertThat(BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i).length())
+                    .as(
+                            "Property at index %d ('%s') should not be longer 
than property at index %d ('%s')",
+                            i,
+                            
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i),
+                            i - 1,
+                            
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i - 1))
+                    .isLessThanOrEqualTo(
+                            
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i - 1).length());
+        }
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index 82a35e3d3c4..e673af3a55e 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
 
+import java.io.IOException;
 import java.net.URI;
 import java.time.Duration;
 
@@ -31,7 +33,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link NativeS3FileSystemFactory}. */
 class NativeS3FileSystemFactoryTest {
-
     private static Configuration baseConfig() {
         Configuration config = new Configuration();
         config.setString("s3.access-key", "test-access-key");
@@ -71,9 +72,12 @@ class NativeS3FileSystemFactoryTest {
 
     @Test
     void testCreateFileSystemWithCustomEndpoint() throws Exception {
+        // Global: endpoint A; bucket: endpoint B → bucket endpoint is used
         Configuration config = baseConfig();
-        config.setString("s3.endpoint", "http://localhost:9000";);
-        assertThat(createFs(config)).isNotNull();
+        config.setString("s3.endpoint", "http://global.s3:9000";);
+        config.setString("s3.bucket.test-bucket.endpoint", 
"http://bucket.s3:9000";);
+        assertThat(createFs(config).getClientProvider().getEndpoint())
+                .isEqualTo("http://bucket.s3:9000";);
     }
 
     @Test
@@ -84,9 +88,12 @@ class NativeS3FileSystemFactoryTest {
 
     @Test
     void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
+        // Global: endpoint A; bucket: endpoint B → bucket endpoint is used on 
s3a scheme
         Configuration config = baseConfig();
-        config.setString("s3.endpoint", "http://localhost:9000";);
-        assertThat(createS3aFs(config)).isNotNull();
+        config.setString("s3.endpoint", "http://global.s3:9000";);
+        config.setString("s3.bucket.test-bucket.endpoint", 
"http://bucket.s3:9000";);
+        assertThat(createS3aFs(config).getClientProvider().getEndpoint())
+                .isEqualTo("http://bucket.s3:9000";);
     }
 
     // --- Path-style access ---
@@ -97,9 +104,11 @@ class NativeS3FileSystemFactoryTest {
     }
 
     @Test
-    void testPathStyleAccessExplicitlyEnabled() throws Exception {
+    void testPathStyleAccessBucketOverridesGlobal() throws Exception {
+        // Global: false; bucket: true → bucket wins
         Configuration config = baseConfig();
-        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false);
+        config.setString("s3.bucket.test-bucket.path-style-access", "true");
         
assertThat(createFs(config).getClientProvider().isPathStyleAccess()).isTrue();
     }
 
@@ -313,10 +322,12 @@ class NativeS3FileSystemFactoryTest {
     // --- Region ---
 
     @Test
-    void testExplicitRegionConfiguration() throws Exception {
+    void testRegionBucketOverridesGlobal() throws Exception {
+        // Global: us-east-1; bucket: eu-west-1 → bucket wins
         Configuration config = baseConfig();
-        config.setString("s3.region", "eu-west-1");
-        assertThat(createFs(config)).isNotNull();
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.bucket.test-bucket.region", "eu-west-1");
+        
assertThat(createFs(config).getClientProvider().getRegion()).isEqualTo("eu-west-1");
     }
 
     @Test
@@ -360,10 +371,17 @@ class NativeS3FileSystemFactoryTest {
     // --- s3a scheme ---
 
     @Test
-    void testS3AWithSSEConfiguration() throws Exception {
+    void testS3AWithSSEBucketOverridesGlobal() throws Exception {
+        // Global: none; bucket: sse-s3 → bucket wins on the s3a scheme
         Configuration config = baseConfig();
-        config.setString("s3.sse.type", "sse-s3");
-        assertThat(createS3aFs(config)).isNotNull();
+        config.setString("s3.sse.type", "none");
+        config.setString("s3.bucket.test-bucket.sse.type", "sse-s3");
+        assertThat(
+                        createS3aFs(config)
+                                .getClientProvider()
+                                .getEncryptionConfig()
+                                .getEncryptionType())
+                .isEqualTo(S3EncryptionConfig.EncryptionType.SSE_S3);
     }
 
     @Test
@@ -378,4 +396,148 @@ class NativeS3FileSystemFactoryTest {
         assertThat(fs.getClientProvider().isChunkedEncoding()).isFalse();
         assertThat(fs.getClientProvider().isChecksumValidation()).isFalse();
     }
+
+    // ---- Bucket-level configuration tests ----
+
+    /**
+     * Validates that misconfigured per-bucket credentials surface as a 
configuration error at
+     * {@code configure()} time, not as an opaque AWS SDK error at first 
request. Override
+     * resolution itself (which wins between bucket and global) is 
exhaustively covered by {@code
+     * BucketConfigProviderTest}; this test guards the factory-layer behaviour 
that is unique to it:
+     * throwing on partial bucket credentials.
+     */
+    @Test
+    void testBucketSpecificPartialCredentialsThrows() {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "global-access-key");
+        config.setString("s3.secret-key", "global-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.bucket.bad-bucket.access-key", "only-access-key");
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        assertThatThrownBy(() -> factory.configure(config))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("must be set together");
+    }
+
+    @Test
+    void testBucketOverrideWinsForConnectionAndEncryptionFields() throws 
Exception {
+        Configuration config = new Configuration();
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+        // Global
+        config.setString("s3.access-key", "global-access");
+        config.setString("s3.secret-key", "global-secret");
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.endpoint", "http://global.s3:9000";);
+        config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false);
+        config.setString("s3.sse.type", "sse-s3");
+        config.setString("s3.sse.kms.key-id", "global-kms-key");
+        // Bucket
+        config.setString("s3.bucket.test-bucket.access-key", "bucket-access");
+        config.setString("s3.bucket.test-bucket.secret-key", "bucket-secret");
+        config.setString("s3.bucket.test-bucket.region", "eu-west-1");
+        config.setString("s3.bucket.test-bucket.endpoint", 
"http://bucket.s3:9000";);
+        config.setString("s3.bucket.test-bucket.path-style-access", "true");
+        config.setString("s3.bucket.test-bucket.sse.type", "sse-kms");
+        config.setString("s3.bucket.test-bucket.sse.kms.key-id", 
"bucket-kms-key");
+
+        S3ClientProvider provider = createFs(config).getClientProvider();
+
+        // All bucket values win over global
+        assertThat(provider.getRegion()).isEqualTo("eu-west-1");
+        assertThat(provider.getEndpoint()).isEqualTo("http://bucket.s3:9000";);
+        assertThat(provider.isPathStyleAccess()).isTrue();
+        
assertThat(provider.getCredentialsProvider().resolveCredentials().accessKeyId())
+                .isEqualTo("bucket-access");
+        
assertThat(provider.getCredentialsProvider().resolveCredentials().secretAccessKey())
+                .isEqualTo("bucket-secret");
+        assertThat(provider.getEncryptionConfig().getEncryptionType())
+                .isEqualTo(S3EncryptionConfig.EncryptionType.SSE_KMS);
+        
assertThat(provider.getEncryptionConfig().getKmsKeyId()).isEqualTo("bucket-kms-key");
+    }
+
+    @Test
+    void testBucketOverrideWinsForAssumeRoleFields() throws Exception {
+        Configuration config = new Configuration();
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+        config.setString("s3.access-key", "global-access");
+        config.setString("s3.secret-key", "global-secret");
+        config.setString("s3.region", "us-east-1");
+        // Global assume-role
+        config.setString("s3.assume-role.arn", 
"arn:aws:iam::111111111111:role/GlobalRole");
+        config.setString("s3.assume-role.external-id", "global-ext-id");
+        config.setString("s3.assume-role.session-name", "global-session");
+        
config.set(NativeS3FileSystemFactory.ASSUME_ROLE_SESSION_DURATION_SECONDS, 900);
+        // Bucket assume-role overrides
+        config.setString(
+                "s3.bucket.test-bucket.assume-role.arn",
+                "arn:aws:iam::222222222222:role/BucketRole");
+        config.setString("s3.bucket.test-bucket.assume-role.external-id", 
"bucket-ext-id");
+        config.setString("s3.bucket.test-bucket.assume-role.session-name", 
"bucket-session");
+        config.setString("s3.bucket.test-bucket.assume-role.session-duration", 
"1800");
+
+        S3ClientProvider provider = createFs(config).getClientProvider();
+        assertThat(provider.getAssumeRoleArn())
+                .isEqualTo("arn:aws:iam::222222222222:role/BucketRole");
+        
assertThat(provider.getAssumeRoleExternalId()).isEqualTo("bucket-ext-id");
+        
assertThat(provider.getAssumeRoleSessionName()).isEqualTo("bucket-session");
+        
assertThat(provider.getAssumeRoleSessionDurationSeconds()).isEqualTo(1800);
+    }
+
+    @Test
+    void testBucketOverrideWinsForCredentialsProvider() throws Exception {
+        Configuration config = new Configuration();
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+        config.setString("s3.region", "us-east-1");
+        // Global: static credentials
+        config.setString("s3.access-key", "global-access");
+        config.setString("s3.secret-key", "global-secret");
+        // Bucket: AnonymousCredentialsProvider
+        config.setString(
+                "s3.bucket.test-bucket.aws.credentials.provider",
+                
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+        AwsCredentials creds =
+                
createFs(config).getClientProvider().getCredentialsProvider().resolveCredentials();
+
+        // Bucket anonymous provider wins; global static key "global-access" 
must not be used
+        assertThat(creds.accessKeyId()).isNotEqualTo("global-access");
+    }
+
+    @Test
+    void testBucketOverrideIgnoredForDifferentBucket() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.endpoint", "http://global.s3:9000";);
+        config.setString("s3.bucket.other-bucket.region", "ap-south-1");
+        config.setString("s3.bucket.other-bucket.endpoint", 
"http://other.s3:9000";);
+
+        // createFs uses URI s3://test-bucket/ — "other-bucket" overrides must 
NOT apply
+        NativeS3FileSystem fs = createFs(config);
+        assertThat(fs.getClientProvider().getRegion()).isEqualTo("us-east-1");
+        
assertThat(fs.getClientProvider().getEndpoint()).isEqualTo("http://global.s3:9000";);
+    }
+
+    @Test
+    void testPartialBucketOverrideFallsBackToGlobal() throws Exception {
+        Configuration config = baseConfig();
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.endpoint", "http://global.s3:9000";);
+        // Override only region for the bucket — endpoint falls back to global
+        config.setString("s3.bucket.test-bucket.region", "eu-central-1");
+
+        NativeS3FileSystem fs = createFs(config);
+        
assertThat(fs.getClientProvider().getRegion()).isEqualTo("eu-central-1");
+        
assertThat(fs.getClientProvider().getEndpoint()).isEqualTo("http://global.s3:9000";);
+    }
+
+    @Test
+    void testMissingBucketNameInUriThrowsIOException() {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        factory.configure(baseConfig());
+
+        assertThatThrownBy(() -> 
factory.create(URI.create("s3:///path/to/file")))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("bucket name");
+    }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BucketConfigTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BucketConfigTest.java
new file mode 100644
index 00000000000..00543c0c9c8
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BucketConfigTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link S3BucketConfig}. */
+class S3BucketConfigTest {
+
+    @Test
+    void testBuilderWithAllFields() {
+        S3BucketConfig config =
+                S3BucketConfig.builder("my-bucket")
+                        .region("us-west-2")
+                        .endpoint("https://custom.s3.endpoint";)
+                        .pathStyleAccess(true)
+                        .accessKey("AKIAIOSFODNN7EXAMPLE")
+                        .secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+                        .sseType("sse-kms")
+                        .sseKmsKeyId("arn:aws:kms:us-east-1:123:key/abc")
+                        .assumeRoleArn("arn:aws:iam::123:role/S3Role")
+                        .assumeRoleExternalId("ext-id-123")
+                        .assumeRoleSessionName("my-session")
+                        .assumeRoleSessionDurationSeconds(7200)
+                        .credentialsProvider("AnonymousCredentialsProvider")
+                        .build();
+
+        assertThat(config.getBucketName()).isEqualTo("my-bucket");
+        assertThat(config.getRegion()).isEqualTo("us-west-2");
+        
assertThat(config.getEndpoint()).isEqualTo("https://custom.s3.endpoint";);
+        assertThat(config.getPathStyleAccess()).isTrue();
+        assertThat(config.getAccessKey()).isEqualTo("AKIAIOSFODNN7EXAMPLE");
+        
assertThat(config.getSecretKey()).isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+        assertThat(config.getSseType()).isEqualTo("sse-kms");
+        
assertThat(config.getSseKmsKeyId()).isEqualTo("arn:aws:kms:us-east-1:123:key/abc");
+        
assertThat(config.getAssumeRoleArn()).isEqualTo("arn:aws:iam::123:role/S3Role");
+        assertThat(config.getAssumeRoleExternalId()).isEqualTo("ext-id-123");
+        assertThat(config.getAssumeRoleSessionName()).isEqualTo("my-session");
+        
assertThat(config.getAssumeRoleSessionDurationSeconds()).isEqualTo(7200);
+        
assertThat(config.getCredentialsProvider()).isEqualTo("AnonymousCredentialsProvider");
+        assertThat(config.hasAnyOverride()).isTrue();
+    }
+
+    @Test
+    void testNoOverridesHasAnyOverrideFalse() {
+        
assertThat(S3BucketConfig.builder("empty-bucket").build().hasAnyOverride()).isFalse();
+    }
+
+    /** Each field, when set alone, must trigger {@code hasAnyOverride()}. */
+    static Stream<S3BucketConfig> singleFieldConfigs() {
+        return Stream.of(
+                S3BucketConfig.builder("b").region("us-east-1").build(),
+                
S3BucketConfig.builder("b").endpoint("http://localhost:9000";).build(),
+                S3BucketConfig.builder("b").pathStyleAccess(true).build(),
+                
S3BucketConfig.builder("b").accessKey("KEY").secretKey("SECRET").build(),
+                S3BucketConfig.builder("b").sseType("sse-s3").build(),
+                S3BucketConfig.builder("b").sseKmsKeyId("key-id").build(),
+                
S3BucketConfig.builder("b").assumeRoleArn("arn:aws:iam::1:role/R").build(),
+                
S3BucketConfig.builder("b").assumeRoleExternalId("ext-id").build(),
+                
S3BucketConfig.builder("b").assumeRoleSessionName("session").build(),
+                
S3BucketConfig.builder("b").assumeRoleSessionDurationSeconds(900).build(),
+                S3BucketConfig.builder("b")
+                        .credentialsProvider("AnonymousCredentialsProvider")
+                        .build());
+    }
+
+    @ParameterizedTest
+    @MethodSource("singleFieldConfigs")
+    void testEachFieldAloneTriggersHasAnyOverride(S3BucketConfig config) {
+        assertThat(config.hasAnyOverride())
+                .as("hasAnyOverride() must be true when any single field is 
set")
+                .isTrue();
+    }
+
+    @Test
+    void testPartialCredentialsAccessKeyOnlyRejected() {
+        assertThatThrownBy(
+                        () -> 
S3BucketConfig.builder("b").accessKey("AKIAIOSFODNN7EXAMPLE").build())
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("access-key")
+                .hasMessageContaining("secret-key")
+                .hasMessageContaining("must be set together");
+    }
+
+    @Test
+    void testPartialCredentialsSecretKeyOnlyRejected() {
+        assertThatThrownBy(
+                        () ->
+                                S3BucketConfig.builder("b")
+                                        
.secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+                                        .build())
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining("access-key")
+                .hasMessageContaining("secret-key")
+                .hasMessageContaining("must be set together");
+    }
+
+    @Test
+    void testToStringRedactsCredentials() {
+        S3BucketConfig config =
+                S3BucketConfig.builder("secure-bucket")
+                        .accessKey("AKIAIOSFODNN7EXAMPLE")
+                        .secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+                        .region("us-east-1")
+                        .build();
+
+        String str = config.toString();
+        assertThat(str).contains("credentials=" + 
GlobalConfiguration.HIDDEN_CONTENT);
+        assertThat(str).doesNotContain("AKIAIOSFODNN7EXAMPLE");
+        assertThat(str).doesNotContain("wJalrXUtnFEMI");
+    }
+
+    @Test
+    void testToStringRedactsKmsKeyIdAndIncludesAllFields() {
+        S3BucketConfig config =
+                S3BucketConfig.builder("my-bucket")
+                        .region("us-west-2")
+                        .endpoint("https://s3.example.com";)
+                        .pathStyleAccess(true)
+                        .accessKey("AKIAIOSFODNN7EXAMPLE")
+                        .secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+                        .sseType("sse-kms")
+                        .sseKmsKeyId("arn:aws:kms:us-east-1:123:key/abc")
+                        .assumeRoleArn("arn:aws:iam::123:role/R")
+                        .assumeRoleExternalId("ext-id")
+                        .assumeRoleSessionName("my-session")
+                        .assumeRoleSessionDurationSeconds(3600)
+                        .credentialsProvider("AnonymousCredentialsProvider")
+                        .build();
+
+        String str = config.toString();
+        assertThat(str).contains("region='us-west-2'");
+        assertThat(str).contains("endpoint='https://s3.example.com'");
+        assertThat(str).contains("pathStyleAccess=true");
+        assertThat(str).contains("sseType='sse-kms'");
+        assertThat(str).contains("sseKmsKeyId=" + 
GlobalConfiguration.HIDDEN_CONTENT);
+        assertThat(str).doesNotContain("arn:aws:kms:us-east-1:123:key/abc");
+        assertThat(str).contains("assumeRoleArn='arn:aws:iam::123:role/R'");
+        assertThat(str).contains("assumeRoleExternalId='ext-id'");
+        assertThat(str).contains("assumeRoleSessionName='my-session'");
+        assertThat(str).contains("assumeRoleSessionDurationSeconds=3600");
+        
assertThat(str).contains("credentialsProvider='AnonymousCredentialsProvider'");
+    }
+}

Reply via email to