This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa9c855d0c [FLINK-39113][s3] Fix s3.sse.kms.encryption-context config 
in native s3 connector
1aa9c855d0c is described below

commit 1aa9c855d0c0fc2f12b0e221cc348fe9385b861c
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue May 5 20:21:43 2026 +0200

    [FLINK-39113][s3] Fix s3.sse.kms.encryption-context config in native s3 
connector
---
 flink-filesystems/flink-s3-fs-native/README.md     |   5 +-
 .../fs/s3native/NativeS3FileSystemFactory.java     |  18 +-
 .../flink/fs/s3native/NativeS3OutputStream.java    |  11 +-
 .../apache/flink/fs/s3native/S3ClientProvider.java |  28 ++-
 .../flink/fs/s3native/S3EncryptionConfig.java      |  99 ++++----
 .../s3native/writer/NativeS3ObjectOperations.java  |  35 +--
 .../flink/fs/s3native/S3BlockLocationTest.java     |  46 ++++
 .../flink/fs/s3native/S3EncryptionConfigTest.java  | 276 +++++++++++++++++++++
 .../flink/fs/s3native/S3ExceptionUtilsTest.java    | 137 ++++++++++
 .../apache/flink/fs/s3native/S3FileStatusTest.java |  77 ++++++
 10 files changed, 636 insertions(+), 96 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-native/README.md 
b/flink-filesystems/flink-s3-fs-native/README.md
index 7f6cfc56bed..6dcf2c74ff0 100644
--- a/flink-filesystems/flink-s3-fs-native/README.md
+++ b/flink-filesystems/flink-s3-fs-native/README.md
@@ -79,6 +79,7 @@ input.sinkTo(FileSink.forRowFormat(new 
Path("s3://my-bucket/output"),
 |-----|---------|-------------|
 | s3.sse.type | none | Encryption type: `none`, `sse-s3` (AES256), `sse-kms` 
(AWS KMS) |
 | s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default 
aws/s3 key if not specified) |
+| s3.sse.kms.encryption-context | (none) | Encryption context key-value pairs 
for SSE-KMS. Format: `key1:value1,key2:value2`. Keys/values containing `:` must 
be quoted. |
 
 ### IAM Assume Role
 
@@ -128,9 +129,7 @@ This enables IAM policies to restrict access based on 
context values:
 ```yaml
 s3.sse.type: sse-kms
 s3.sse.kms.key-id: alias/my-key
-# Configure encryption context as key-value pairs
-s3.sse.kms.encryption-context.department: finance
-s3.sse.kms.encryption-context.project: budget-reports
+s3.sse.kms.encryption-context: {"aws:s3:arn": "arn:aws:s3:::my-bucket/my-file"}
 ```
 
 With encryption context, you can create IAM policies like:
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 21f0dc0319e..b7c0de3acfc 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * Factory for creating Native S3 FileSystem instances.
@@ -190,6 +192,16 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "Example: 
'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' "
                                     + "or 'alias/my-s3-key'");
 
+    public static final ConfigOption<Map<String, String>> 
SSE_KMS_ENCRYPTION_CONTEXT =
+            ConfigOptions.key("s3.sse.kms.encryption-context")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Encryption context key-value pairs for SSE-KMS. "
+                                    + "Provides additional authenticated data 
and enables "
+                                    + "fine-grained IAM policy conditions. "
+                                    + "Format: 'key1:value1,key2:value2'");
+
     // IAM Assume Role Configuration
     public static final ConfigOption<String> ASSUME_ROLE_ARN =
             ConfigOptions.key("s3.assume-role.arn")
@@ -312,7 +324,11 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
         boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
 
         S3EncryptionConfig encryptionConfig =
-                S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), 
config.get(SSE_KMS_KEY_ID));
+                S3EncryptionConfig.fromConfig(
+                        config.get(SSE_TYPE),
+                        config.get(SSE_KMS_KEY_ID),
+                        config.getOptional(SSE_KMS_ENCRYPTION_CONTEXT)
+                                .orElse(Collections.emptyMap()));
         String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
         int numEntropyChars = -1;
         if (entropyInjectionKey != null) {
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
index d125d05fd27..ac3562f5e72 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
@@ -195,9 +195,14 @@ class NativeS3OutputStream extends FSDataOutputStream {
             if (encryptionConfig.isEnabled()) {
                 
putRequestBuilder.serverSideEncryption(encryptionConfig.getServerSideEncryption());
                 if (encryptionConfig.getEncryptionType()
-                                == S3EncryptionConfig.EncryptionType.SSE_KMS
-                        && encryptionConfig.getKmsKeyId() != null) {
-                    
putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId());
+                        == S3EncryptionConfig.EncryptionType.SSE_KMS) {
+                    if (encryptionConfig.getKmsKeyId() != null) {
+                        
putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId());
+                    }
+                    if (encryptionConfig.hasEncryptionContext()) {
+                        putRequestBuilder.ssekmsEncryptionContext(
+                                encryptionConfig.serializeEncryptionContext());
+                    }
                 }
             }
 
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index 656a3f567da..46d089ec23e 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -54,6 +54,7 @@ import java.net.URI;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,7 +75,7 @@ class S3ClientProvider implements AutoCloseableAsync {
     private final S3Client s3Client;
     private final S3TransferManager transferManager;
     private final S3EncryptionConfig encryptionConfig;
-    @Nullable private final AwsCredentialsProvider credentialsProvider;
+    private final AwsCredentialsProvider credentialsProvider;
     @Nullable private final StsClient stsClient;
     private final Duration clientCloseTimeout;
     private final Duration connectionTimeout;
@@ -90,8 +91,8 @@ class S3ClientProvider implements AutoCloseableAsync {
     private S3ClientProvider(
             S3Client s3Client,
             S3TransferManager transferManager,
-            S3EncryptionConfig encryptionConfig,
-            @Nullable AwsCredentialsProvider credentialsProvider,
+            @Nullable S3EncryptionConfig encryptionConfig,
+            AwsCredentialsProvider credentialsProvider,
             @Nullable StsClient stsClient,
             Duration clientCloseTimeout,
             Duration connectionTimeout,
@@ -102,16 +103,23 @@ class S3ClientProvider implements AutoCloseableAsync {
             boolean checksumValidation,
             int maxConnections,
             int maxRetries) {
-        this.s3Client = s3Client;
-        this.transferManager = transferManager;
+        this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be 
null");
+        this.transferManager =
+                Objects.requireNonNull(transferManager, "transferManager must 
not be null");
         this.encryptionConfig =
                 encryptionConfig != null ? encryptionConfig : 
S3EncryptionConfig.none();
-        this.credentialsProvider = credentialsProvider;
+        this.credentialsProvider =
+                Objects.requireNonNull(credentialsProvider, 
"credentialsProvider must not be null");
         this.stsClient = stsClient;
-        this.clientCloseTimeout = clientCloseTimeout;
-        this.connectionTimeout = connectionTimeout;
-        this.socketTimeout = socketTimeout;
-        this.connectionMaxIdleTime = connectionMaxIdleTime;
+        this.clientCloseTimeout =
+                Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout 
must not be null");
+        this.connectionTimeout =
+                Objects.requireNonNull(connectionTimeout, "connectionTimeout 
must not be null");
+        this.socketTimeout =
+                Objects.requireNonNull(socketTimeout, "socketTimeout must not 
be null");
+        this.connectionMaxIdleTime =
+                Objects.requireNonNull(
+                        connectionMaxIdleTime, "connectionMaxIdleTime must not 
be null");
         this.pathStyleAccess = pathStyleAccess;
         this.chunkedEncoding = chunkedEncoding;
         this.checksumValidation = checksumValidation;
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
index 67e869ab992..deae79113d8 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
@@ -19,15 +19,20 @@
 package org.apache.flink.fs.s3native;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.StringUtils;
 
 import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Configuration for S3 server-side encryption (SSE).
@@ -76,8 +81,9 @@ public class S3EncryptionConfig implements Serializable {
     private S3EncryptionConfig(
             EncryptionType encryptionType,
             @Nullable String kmsKeyId,
-            Map<String, String> encryptionContext) {
-        this.encryptionType = encryptionType;
+            @Nullable Map<String, String> encryptionContext) {
+        this.encryptionType =
+                Objects.requireNonNull(encryptionType, "encryptionType must 
not be null");
         this.kmsKeyId = kmsKeyId;
         this.encryptionContext =
                 encryptionContext != null
@@ -96,74 +102,49 @@ public class S3EncryptionConfig implements Serializable {
     }
 
     /**
-     * Creates a config for SSE-KMS encryption with the default KMS key.
-     *
-     * <p>Uses the AWS-managed KMS key (aws/s3) for the S3 bucket.
-     */
-    public static S3EncryptionConfig sseKms() {
-        return new S3EncryptionConfig(EncryptionType.SSE_KMS, null);
-    }
-
-    /**
-     * Creates a config for SSE-KMS encryption with a specific KMS key.
-     *
-     * @param kmsKeyId The KMS key ID, ARN, or alias (e.g., 
"arn:aws:kms:region:account:key/key-id"
-     *     or "alias/my-key")
-     */
-    public static S3EncryptionConfig sseKms(String kmsKeyId) {
-        return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId);
-    }
-
-    /**
-     * Creates a config for SSE-KMS encryption with a specific KMS key and 
encryption context.
+     * Creates a config for SSE-KMS encryption.
      *
-     * <p>The encryption context is a set of key-value pairs that:
-     *
-     * <ul>
-     *   <li>Provides additional authenticated data (AAD) for encryption
-     *   <li>Can be used in IAM policy conditions for fine-grained access 
control
-     *   <li>Is logged in AWS CloudTrail for auditing
-     * </ul>
-     *
-     * <p>Example: You might include context like {"department": "finance", 
"project": "budget"} to
-     * restrict which principals can encrypt/decrypt based on these values.
-     *
-     * @param kmsKeyId The KMS key ID, ARN, or alias
-     * @param encryptionContext The encryption context key-value pairs
+     * @param kmsKeyId The KMS key ID, ARN, or alias; null uses the 
AWS-managed default key
+     * @param encryptionContext Optional key-value pairs for IAM policy 
conditions and CloudTrail
+     *     auditing; null or empty means no context
      * @see <a 
href="https://docs.aws.amazon.com/kms/latest/developerguide/encrypt_context.html";>AWS
      *     KMS Encryption Context</a>
      */
     public static S3EncryptionConfig sseKms(
-            String kmsKeyId, Map<String, String> encryptionContext) {
-        return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId, 
encryptionContext);
+            @Nullable String kmsKeyId, @Nullable Map<String, String> 
encryptionContext) {
+        return new S3EncryptionConfig(
+                EncryptionType.SSE_KMS,
+                StringUtils.isNullOrWhitespaceOnly(kmsKeyId) ? null : kmsKeyId,
+                encryptionContext);
     }
 
     /**
      * Creates an encryption config from configuration strings.
      *
-     * @param encryptionTypeStr The encryption type: "none", "sse-s3", 
"sse-kms", or "SSE_S3",
-     *     "SSE_KMS"
+     * @param encryptionTypeStr The encryption type: "none", "sse-s3", 
"sse-kms", "aws:kms",
+     *     "aes256" (case-insensitive)
      * @param kmsKeyId The KMS key ID (required for SSE-KMS, ignored for other 
types)
      * @return The encryption configuration
      * @throws IllegalArgumentException if the encryption type is invalid
      */
     public static S3EncryptionConfig fromConfig(
-            @Nullable String encryptionTypeStr, @Nullable String kmsKeyId) {
-        if (encryptionTypeStr == null
-                || encryptionTypeStr.isEmpty()
+            @Nullable String encryptionTypeStr,
+            @Nullable String kmsKeyId,
+            Map<String, String> encryptionContext) {
+        if (StringUtils.isNullOrWhitespaceOnly(encryptionTypeStr)
                 || "none".equalsIgnoreCase(encryptionTypeStr)) {
             return none();
         }
 
-        String normalizedType = encryptionTypeStr.toUpperCase().replace("-", 
"_").replace(":", "_");
+        String normalizedType = encryptionTypeStr.toLowerCase(Locale.ROOT);
 
         switch (normalizedType) {
-            case "SSE_S3":
-            case "AES256":
+            case "sse-s3":
+            case "aes256":
                 return sseS3();
-            case "SSE_KMS":
-            case "AWS_KMS":
-                return kmsKeyId != null && !kmsKeyId.isEmpty() ? 
sseKms(kmsKeyId) : sseKms();
+            case "sse-kms":
+            case "aws:kms":
+                return sseKms(kmsKeyId, encryptionContext);
             default:
                 throw new IllegalArgumentException(
                         "Unknown encryption type: "
@@ -220,6 +201,28 @@ public class S3EncryptionConfig implements Serializable {
         }
     }
 
+    public String serializeEncryptionContext() {
+        StringBuilder json = new StringBuilder("{");
+        boolean first = true;
+        for (Map.Entry<String, String> entry : encryptionContext.entrySet()) {
+            if (!first) {
+                json.append(",");
+            }
+            json.append("\"")
+                    .append(escapeJson(entry.getKey()))
+                    .append("\":\"")
+                    .append(escapeJson(entry.getValue()))
+                    .append("\"");
+            first = false;
+        }
+        json.append("}");
+        return 
Base64.getEncoder().encodeToString(json.toString().getBytes(StandardCharsets.UTF_8));
+    }
+
+    private String escapeJson(String value) {
+        return value.replace("\\", "\\\\").replace("\"", "\\\"");
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new 
StringBuilder("S3EncryptionConfig{type=").append(encryptionType);
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
index 01bc660f3ac..14ff1eeab12 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
@@ -85,7 +85,6 @@ import java.util.stream.Collectors;
  * <ul>
  *   <li>SSE-C (customer-provided keys) via a KeyProvider interface
  *   <li>Client-side encryption via an EncryptionHandler interface
- *   <li>Encryption context for SSE-KMS (see HADOOP-19197)
  * </ul>
  *
  * <p><b>S3 URI Handling:</b> The {@link #extractKey(Path)} and {@link 
#extractBucketName(Path)}
@@ -155,7 +154,7 @@ public class NativeS3ObjectOperations {
             }
             if (encryptionConfig.hasEncryptionContext()) {
                 requestBuilder.ssekmsEncryptionContext(
-                        
serializeEncryptionContext(encryptionConfig.getEncryptionContext()));
+                        encryptionConfig.serializeEncryptionContext());
             }
         }
     }
@@ -171,36 +170,11 @@ public class NativeS3ObjectOperations {
             }
             if (encryptionConfig.hasEncryptionContext()) {
                 requestBuilder.ssekmsEncryptionContext(
-                        
serializeEncryptionContext(encryptionConfig.getEncryptionContext()));
+                        encryptionConfig.serializeEncryptionContext());
             }
         }
     }
 
-    /**
-     * Serializes the encryption context map to a Base64-encoded JSON string 
as required by S3 API.
-     */
-    private String serializeEncryptionContext(java.util.Map<String, String> 
context) {
-        StringBuilder json = new StringBuilder("{");
-        boolean first = true;
-        for (java.util.Map.Entry<String, String> entry : context.entrySet()) {
-            if (!first) {
-                json.append(",");
-            }
-            json.append("\"")
-                    .append(escapeJson(entry.getKey()))
-                    .append("\":\"")
-                    .append(escapeJson(entry.getValue()))
-                    .append("\"");
-            first = false;
-        }
-        json.append("}");
-        return 
java.util.Base64.getEncoder().encodeToString(json.toString().getBytes());
-    }
-
-    private String escapeJson(String value) {
-        return value.replace("\\", "\\\\").replace("\"", "\\\"");
-    }
-
     public UploadPartResult uploadPart(
             String key, String uploadId, int partNumber, File inputFile, long 
length)
             throws IOException {
@@ -275,9 +249,8 @@ public class NativeS3ObjectOperations {
                                                 }
                                                 if 
(encryptionConfig.hasEncryptionContext()) {
                                                     
req.ssekmsEncryptionContext(
-                                                            
serializeEncryptionContext(
-                                                                    
encryptionConfig
-                                                                            
.getEncryptionContext()));
+                                                            encryptionConfig
+                                                                    
.serializeEncryptionContext());
                                                 }
                                             }
                                         }
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java
new file mode 100644
index 00000000000..3564a3a3015
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3BlockLocation}. */
+class S3BlockLocationTest {
+
+    @Test
+    void testAccessors() {
+        S3BlockLocation loc = new S3BlockLocation(new String[] {"localhost"}, 
100L, 512L);
+
+        assertThat(loc.getHosts()).containsExactly("localhost");
+        assertThat(loc.getOffset()).isEqualTo(100L);
+        assertThat(loc.getLength()).isEqualTo(512L);
+    }
+
+    @Test
+    void testCompareToOrdersByOffset() {
+        S3BlockLocation first = new S3BlockLocation(new String[] 
{"localhost"}, 0L, 100L);
+        S3BlockLocation second = new S3BlockLocation(new String[] 
{"localhost"}, 100L, 100L);
+
+        assertThat(first.compareTo(second)).isNegative();
+        assertThat(second.compareTo(first)).isPositive();
+        assertThat(first.compareTo(first)).isZero();
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java
new file mode 100644
index 00000000000..cfda10a1452
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.NONE;
+import static 
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_KMS;
+import static 
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_S3;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link S3EncryptionConfig}. */
+class S3EncryptionConfigTest {
+
+    @Test
+    void sseKms_withContext_contextStoredDefensively() {
+        Map<String, String> ctx =
+                new HashMap<>(Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"));
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx);
+        ctx.put("extra", "value");
+
+        assertThat(c.getEncryptionContext())
+                .isEqualTo(Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"));
+        assertThat(c.hasEncryptionContext()).isTrue();
+    }
+
+    @Test
+    void sseKms_nullKeyId_keyIdIsNull() {
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, 
Collections.emptyMap());
+
+        assertThat(c.getKmsKeyId()).isNull();
+    }
+
+    @Test
+    void sseKms_returnedContext_isUnmodifiable() {
+        S3EncryptionConfig c =
+                S3EncryptionConfig.sseKms(
+                        "key-id", Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"));
+
+        assertThatThrownBy(() -> c.getEncryptionContext().put("x", "y"))
+                .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void sseKms_contextAbsent_contextIsEmpty(Map<String, String> context) {
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", context);
+
+        assertThat(c.getEncryptionContext()).isEmpty();
+        assertThat(c.hasEncryptionContext()).isFalse();
+    }
+
+    static Stream<Arguments> sseKms_contextAbsent_contextIsEmpty() {
+        return Stream.of(Arguments.of((Object) null), 
Arguments.of(Collections.emptyMap()));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void getServerSideEncryption_allTypes_returnsCorrectSseValue(
+            String configType, ServerSideEncryption expected) {
+        S3EncryptionConfig c =
+                S3EncryptionConfig.fromConfig(configType, null, 
Collections.emptyMap());
+
+        assertThat(c.getServerSideEncryption()).isEqualTo(expected);
+    }
+
+    static Stream<Arguments> 
getServerSideEncryption_allTypes_returnsCorrectSseValue() {
+        return Stream.of(
+                Arguments.of(null, null),
+                Arguments.of("sse-s3", ServerSideEncryption.AES256),
+                Arguments.of("sse-kms", ServerSideEncryption.AWS_KMS));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void fromConfig_typeVariants_returnExpectedType(
+            String input, S3EncryptionConfig.EncryptionType expected) {
+        assertThat(
+                        S3EncryptionConfig.fromConfig(input, null, 
Collections.emptyMap())
+                                .getEncryptionType())
+                .isEqualTo(expected);
+    }
+
+    static Stream<Arguments> fromConfig_typeVariants_returnExpectedType() {
+        return Stream.of(
+                Arguments.of(null, NONE),
+                Arguments.of("", NONE),
+                Arguments.of("none", NONE),
+                Arguments.of("NONE", NONE),
+                Arguments.of("   ", NONE),
+                Arguments.of("sse-s3", SSE_S3),
+                Arguments.of("AES256", SSE_S3),
+                Arguments.of("sse-kms", SSE_KMS),
+                Arguments.of("aws:kms", SSE_KMS));
+    }
+
+    @Test
+    void fromConfig_sseKmsWithKeyAndContext_keyAndContextPreserved() {
+        S3EncryptionConfig result =
+                S3EncryptionConfig.fromConfig(
+                        "sse-kms",
+                        "my-key",
+                        Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"));
+
+        assertThat(result.getKmsKeyId()).isEqualTo("my-key");
+        assertThat(result.getEncryptionContext())
+                .isEqualTo(Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void fromConfig_sseKmsWithNoKeyId_keyIdIsNull(String kmsKeyId) {
+        assertThat(
+                        S3EncryptionConfig.fromConfig("sse-kms", kmsKeyId, 
Collections.emptyMap())
+                                .getKmsKeyId())
+                .isNull();
+    }
+
+    static Stream<Arguments> fromConfig_sseKmsWithNoKeyId_keyIdIsNull() {
+        return Stream.of(Arguments.of((Object) null), Arguments.of(""));
+    }
+
+    @Test
+    void fromConfig_sseKmsDefaultKeyWithContext_contextPreserved() {
+        assertThat(
+                        S3EncryptionConfig.fromConfig(
+                                        "sse-kms",
+                                        null,
+                                        Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"))
+                                .hasEncryptionContext())
+                .isTrue();
+    }
+
+    @Test
+    void fromConfig_sseS3WithContext_contextIgnored() {
+        S3EncryptionConfig c =
+                S3EncryptionConfig.fromConfig(
+                        "sse-s3", null, Map.of("aws:s3:arn", 
"arn:aws:s3:::my-bucket/my-file"));
+
+        assertThat(c.getEncryptionType()).isEqualTo(SSE_S3);
+        assertThat(c.getEncryptionContext()).isEmpty();
+    }
+
+    @Test
+    void fromConfig_sseS3_kmsKeyIdIgnored() {
+        S3EncryptionConfig c =
+                S3EncryptionConfig.fromConfig("sse-s3", "some-key", 
Collections.emptyMap());
+
+        assertThat(c.getKmsKeyId()).isNull();
+    }
+
+    @Test
+    void fromConfig_unknownType_throwsIllegalArgument() {
+        assertThatThrownBy(
+                        () ->
+                                S3EncryptionConfig.fromConfig(
+                                        "invalid-type", null, 
Collections.emptyMap()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("invalid-type");
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void isEnabled_encryptionType_returnsCorrectState(S3EncryptionConfig 
config, boolean expected) {
+        assertThat(config.isEnabled()).isEqualTo(expected);
+    }
+
+    static Stream<Arguments> isEnabled_encryptionType_returnsCorrectState() {
+        return Stream.of(
+                Arguments.of(
+                        S3EncryptionConfig.fromConfig(null, null, 
Collections.emptyMap()), false),
+                Arguments.of(
+                        S3EncryptionConfig.fromConfig("sse-s3", null, 
Collections.emptyMap()),
+                        true),
+                Arguments.of(
+                        S3EncryptionConfig.fromConfig("sse-kms", null, 
Collections.emptyMap()),
+                        true));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void serializeEncryptionContext_exactOutput_correctBase64Json(
+            Map<String, String> context, String expectedDecoded) {
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, context);
+        String decoded = new 
String(Base64.getDecoder().decode(c.serializeEncryptionContext()));
+
+        assertThat(decoded).isEqualTo(expectedDecoded);
+    }
+
+    static Stream<Arguments> 
serializeEncryptionContext_exactOutput_correctBase64Json() {
+        return Stream.of(
+                Arguments.of(Collections.emptyMap(), "{}"),
+                Arguments.of(Map.of("k", "v"), "{\"k\":\"v\"}"));
+    }
+
+    @Test
+    void serializeEncryptionContext_multipleEntries_allEntriesPresent() {
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of("k1", 
"v1", "k2", "v2"));
+        String decoded = new 
String(Base64.getDecoder().decode(c.serializeEncryptionContext()));
+
+        assertThat(decoded).contains("\"k1\":\"v1\"", "\"k2\":\"v2\"");
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void serializeEncryptionContext_jsonSpecialChars_escapedCorrectly(
+            String key, String value, String expectedFragment) {
+        S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of(key, 
value));
+        String decoded = new 
String(Base64.getDecoder().decode(c.serializeEncryptionContext()));
+
+        assertThat(decoded).contains(expectedFragment);
+    }
+
+    static Stream<Arguments> 
serializeEncryptionContext_jsonSpecialChars_escapedCorrectly() {
+        return Stream.of(
+                Arguments.of("k", "val\"ue", "\"k\":\"val\\\"ue\""),
+                Arguments.of("k", "val\\ue", "\"k\":\"val\\\\ue\""),
+                Arguments.of("k\"ey", "v", "\"k\\\"ey\":\"v\""),
+                Arguments.of("k\\ey", "v", "\"k\\\\ey\":\"v\""),
+                Arguments.of("k", "\\\"", "\"k\":\"\\\\\\\"\""));
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void serialization_roundTrip_preservesAllFields(S3EncryptionConfig config) 
throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        new ObjectOutputStream(bos).writeObject(config);
+        S3EncryptionConfig copy =
+                (S3EncryptionConfig)
+                        new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))
+                                .readObject();
+
+        
assertThat(copy.getEncryptionType()).isEqualTo(config.getEncryptionType());
+        assertThat(copy.getKmsKeyId()).isEqualTo(config.getKmsKeyId());
+        
assertThat(copy.getEncryptionContext()).isEqualTo(config.getEncryptionContext());
+    }
+
+    static Stream<Arguments> serialization_roundTrip_preservesAllFields() {
+        return Stream.of(
+                Arguments.of(S3EncryptionConfig.sseKms("key-id", Map.of("k", 
"v"))),
+                Arguments.of(S3EncryptionConfig.none()),
+                Arguments.of(S3EncryptionConfig.sseS3()),
+                Arguments.of(S3EncryptionConfig.sseKms(null, null)));
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java
new file mode 100644
index 00000000000..c60fd422c6e
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3ExceptionUtils}. */
+class S3ExceptionUtilsTest {
+
+    static Stream<Arguments> toIoExceptionCases() {
+        return Stream.of(
+                Arguments.of(
+                        s3Exception(
+                                404,
+                                AwsErrorDetails.builder().errorMessage("key 
not found").build()),
+                        "delete object",
+                        "delete object (HTTP 404: key not found)"),
+                Arguments.of(
+                        s3Exception(500, "internal error"),
+                        "put object",
+                        "put object (HTTP 500: internal error)"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("toIoExceptionCases")
+    void toIOException_contextAndStatusCode_formatsMessageAndPreservesCause(
+            S3Exception cause, String context, String expectedMessage) {
+        IOException result = S3ExceptionUtils.toIOException(context, cause);
+
+        assertThat(result.getMessage()).contains(expectedMessage);
+        assertThat(result.getCause()).isSameAs(cause);
+    }
+
+    static Stream<Arguments> errorMessageExactCases() {
+        return Stream.of(
+                Arguments.of(
+                        s3Exception(
+                                404,
+                                AwsErrorDetails.builder()
+                                        .errorMessage("The specified key does 
not exist.")
+                                        .build()),
+                        "The specified key does not exist."),
+                Arguments.of(s3ExceptionStatusOnly(500), "Unknown S3 error"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("errorMessageExactCases")
+    void errorMessage_exactReturn_matchesExpected(S3Exception e, String 
expected) {
+        assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo(expected);
+    }
+
+    static Stream<Arguments> errorMessageFallbackCases() {
+        return Stream.of(
+                Arguments.of(s3Exception(500, "connection timeout"), 
"connection timeout"),
+                Arguments.of(
+                        s3ExceptionWithMessageAndDetails(
+                                500,
+                                "fallback message",
+                                
AwsErrorDetails.builder().errorCode("InternalError").build()),
+                        "fallback message"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("errorMessageFallbackCases")
+    void errorMessage_fallbackToExceptionMessage_containsExpected(
+            S3Exception e, String expectedMessage) {
+        assertThat(S3ExceptionUtils.errorMessage(e)).contains(expectedMessage);
+    }
+
+    static Stream<Arguments> errorCodeAllCases() {
+        return Stream.of(
+                Arguments.of(
+                        s3Exception(404, 
AwsErrorDetails.builder().errorCode("NoSuchKey").build()),
+                        "NoSuchKey"),
+                Arguments.of(s3Exception(500, "some error"), "Unknown"),
+                Arguments.of(
+                        s3Exception(
+                                500,
+                                AwsErrorDetails.builder()
+                                        .errorMessage("Something went wrong")
+                                        .build()),
+                        "Unknown"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("errorCodeAllCases")
+    void errorCode_allCases_returnsExpected(S3Exception e, String expected) {
+        assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo(expected);
+    }
+
+    private static AwsServiceException s3Exception(int statusCode, 
AwsErrorDetails details) {
+        return 
S3Exception.builder().statusCode(statusCode).awsErrorDetails(details).build();
+    }
+
+    private static AwsServiceException s3Exception(int statusCode, String 
message) {
+        return 
S3Exception.builder().statusCode(statusCode).message(message).build();
+    }
+
+    private static AwsServiceException s3ExceptionWithMessageAndDetails(
+            int statusCode, String message, AwsErrorDetails details) {
+        return S3Exception.builder()
+                .statusCode(statusCode)
+                .message(message)
+                .awsErrorDetails(details)
+                .build();
+    }
+
+    private static AwsServiceException s3ExceptionStatusOnly(int statusCode) {
+        return S3Exception.builder().statusCode(statusCode).build();
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java
new file mode 100644
index 00000000000..aab96258989
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3FileStatus}. */
+class S3FileStatusTest {
+
+    private static final Path PATH = new Path("s3://bucket/key");
+
+    static Stream<Arguments> fileSizeAndModTime() {
+        return Stream.of(
+                Arguments.of(0L, 0L),
+                Arguments.of(1024L, 987654321L),
+                Arguments.of(Long.MAX_VALUE, Long.MAX_VALUE));
+    }
+
+    @ParameterizedTest
+    @MethodSource("fileSizeAndModTime")
+    void withFile_variousSizes_allFieldsSetCorrectly(long size, long modTime) {
+        S3FileStatus s = S3FileStatus.withFile(size, modTime, PATH);
+
+        assertThat(s.getLen()).isEqualTo(size);
+        assertThat(s.getBlockSize()).isEqualTo(size);
+        assertThat(s.getModificationTime()).isEqualTo(modTime);
+        assertThat(s.getAccessTime()).isEqualTo(0L);
+        assertThat(s.isDir()).isFalse();
+        assertThat(s.getPath()).isEqualTo(PATH);
+        assertThat(s.getReplication()).isEqualTo((short) 1);
+    }
+
+    @Test
+    void withDirectory_anyPath_allFieldsSetCorrectly() {
+        S3FileStatus s = S3FileStatus.withDirectory(PATH);
+
+        assertThat(s.getLen()).isEqualTo(0L);
+        assertThat(s.getBlockSize()).isEqualTo(0L);
+        assertThat(s.getModificationTime()).isEqualTo(0L);
+        assertThat(s.getAccessTime()).isEqualTo(0L);
+        assertThat(s.isDir()).isTrue();
+        assertThat(s.getPath()).isEqualTo(PATH);
+        assertThat(s.getReplication()).isEqualTo((short) 1);
+    }
+
+    @Test
+    void constructor_nonZeroAccessTime_accessTimeIsPreserved() {
+        S3FileStatus s = new S3FileStatus(1024L, 1024L, 100L, 999L, false, 
PATH);
+
+        assertThat(s.getAccessTime()).isEqualTo(999L);
+    }
+}


Reply via email to