This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1aa9c855d0c [FLINK-39113][s3] Fix s3.sse.kms.encryption-context config
in native s3 connector
1aa9c855d0c is described below
commit 1aa9c855d0c0fc2f12b0e221cc348fe9385b861c
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue May 5 20:21:43 2026 +0200
[FLINK-39113][s3] Fix s3.sse.kms.encryption-context config in native s3
connector
---
flink-filesystems/flink-s3-fs-native/README.md | 5 +-
.../fs/s3native/NativeS3FileSystemFactory.java | 18 +-
.../flink/fs/s3native/NativeS3OutputStream.java | 11 +-
.../apache/flink/fs/s3native/S3ClientProvider.java | 28 ++-
.../flink/fs/s3native/S3EncryptionConfig.java | 99 ++++----
.../s3native/writer/NativeS3ObjectOperations.java | 35 +--
.../flink/fs/s3native/S3BlockLocationTest.java | 46 ++++
.../flink/fs/s3native/S3EncryptionConfigTest.java | 276 +++++++++++++++++++++
.../flink/fs/s3native/S3ExceptionUtilsTest.java | 137 ++++++++++
.../apache/flink/fs/s3native/S3FileStatusTest.java | 77 ++++++
10 files changed, 636 insertions(+), 96 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-native/README.md
b/flink-filesystems/flink-s3-fs-native/README.md
index 7f6cfc56bed..6dcf2c74ff0 100644
--- a/flink-filesystems/flink-s3-fs-native/README.md
+++ b/flink-filesystems/flink-s3-fs-native/README.md
@@ -79,6 +79,7 @@ input.sinkTo(FileSink.forRowFormat(new
Path("s3://my-bucket/output"),
|-----|---------|-------------|
| s3.sse.type | none | Encryption type: `none`, `sse-s3` (AES256), `sse-kms`
(AWS KMS) |
| s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default
aws/s3 key if not specified) |
+| s3.sse.kms.encryption-context | (none) | Encryption context key-value pairs
for SSE-KMS. Format: `key1:value1,key2:value2`. Keys/values containing `:` must
be quoted. |
### IAM Assume Role
@@ -128,9 +129,7 @@ This enables IAM policies to restrict access based on
context values:
```yaml
s3.sse.type: sse-kms
s3.sse.kms.key-id: alias/my-key
-# Configure encryption context as key-value pairs
-s3.sse.kms.encryption-context.department: finance
-s3.sse.kms.encryption-context.project: budget-reports
+s3.sse.kms.encryption-context: {"aws:s3:arn": "arn:aws:s3:::my-bucket/my-file"}
```
With encryption context, you can create IAM policies like:
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 21f0dc0319e..b7c0de3acfc 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
/**
* Factory for creating Native S3 FileSystem instances.
@@ -190,6 +192,16 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
+ "Example:
'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' "
+ "or 'alias/my-s3-key'");
+ public static final ConfigOption<Map<String, String>>
SSE_KMS_ENCRYPTION_CONTEXT =
+ ConfigOptions.key("s3.sse.kms.encryption-context")
+ .mapType()
+ .noDefaultValue()
+ .withDescription(
+ "Encryption context key-value pairs for SSE-KMS. "
+ + "Provides additional authenticated data
and enables "
+ + "fine-grained IAM policy conditions. "
+ + "Format: 'key1:value1,key2:value2'");
+
// IAM Assume Role Configuration
public static final ConfigOption<String> ASSUME_ROLE_ARN =
ConfigOptions.key("s3.assume-role.arn")
@@ -312,7 +324,11 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
S3EncryptionConfig encryptionConfig =
- S3EncryptionConfig.fromConfig(config.get(SSE_TYPE),
config.get(SSE_KMS_KEY_ID));
+ S3EncryptionConfig.fromConfig(
+ config.get(SSE_TYPE),
+ config.get(SSE_KMS_KEY_ID),
+ config.getOptional(SSE_KMS_ENCRYPTION_CONTEXT)
+ .orElse(Collections.emptyMap()));
String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
int numEntropyChars = -1;
if (entropyInjectionKey != null) {
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
index d125d05fd27..ac3562f5e72 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
@@ -195,9 +195,14 @@ class NativeS3OutputStream extends FSDataOutputStream {
if (encryptionConfig.isEnabled()) {
putRequestBuilder.serverSideEncryption(encryptionConfig.getServerSideEncryption());
if (encryptionConfig.getEncryptionType()
- == S3EncryptionConfig.EncryptionType.SSE_KMS
- && encryptionConfig.getKmsKeyId() != null) {
-
putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId());
+ == S3EncryptionConfig.EncryptionType.SSE_KMS) {
+ if (encryptionConfig.getKmsKeyId() != null) {
+
putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId());
+ }
+ if (encryptionConfig.hasEncryptionContext()) {
+ putRequestBuilder.ssekmsEncryptionContext(
+ encryptionConfig.serializeEncryptionContext());
+ }
}
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index 656a3f567da..46d089ec23e 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -54,6 +54,7 @@ import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,7 +75,7 @@ class S3ClientProvider implements AutoCloseableAsync {
private final S3Client s3Client;
private final S3TransferManager transferManager;
private final S3EncryptionConfig encryptionConfig;
- @Nullable private final AwsCredentialsProvider credentialsProvider;
+ private final AwsCredentialsProvider credentialsProvider;
@Nullable private final StsClient stsClient;
private final Duration clientCloseTimeout;
private final Duration connectionTimeout;
@@ -90,8 +91,8 @@ class S3ClientProvider implements AutoCloseableAsync {
private S3ClientProvider(
S3Client s3Client,
S3TransferManager transferManager,
- S3EncryptionConfig encryptionConfig,
- @Nullable AwsCredentialsProvider credentialsProvider,
+ @Nullable S3EncryptionConfig encryptionConfig,
+ AwsCredentialsProvider credentialsProvider,
@Nullable StsClient stsClient,
Duration clientCloseTimeout,
Duration connectionTimeout,
@@ -102,16 +103,23 @@ class S3ClientProvider implements AutoCloseableAsync {
boolean checksumValidation,
int maxConnections,
int maxRetries) {
- this.s3Client = s3Client;
- this.transferManager = transferManager;
+ this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be
null");
+ this.transferManager =
+ Objects.requireNonNull(transferManager, "transferManager must
not be null");
this.encryptionConfig =
encryptionConfig != null ? encryptionConfig :
S3EncryptionConfig.none();
- this.credentialsProvider = credentialsProvider;
+ this.credentialsProvider =
+ Objects.requireNonNull(credentialsProvider,
"credentialsProvider must not be null");
this.stsClient = stsClient;
- this.clientCloseTimeout = clientCloseTimeout;
- this.connectionTimeout = connectionTimeout;
- this.socketTimeout = socketTimeout;
- this.connectionMaxIdleTime = connectionMaxIdleTime;
+ this.clientCloseTimeout =
+ Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout
must not be null");
+ this.connectionTimeout =
+ Objects.requireNonNull(connectionTimeout, "connectionTimeout
must not be null");
+ this.socketTimeout =
+ Objects.requireNonNull(socketTimeout, "socketTimeout must not
be null");
+ this.connectionMaxIdleTime =
+ Objects.requireNonNull(
+ connectionMaxIdleTime, "connectionMaxIdleTime must not
be null");
this.pathStyleAccess = pathStyleAccess;
this.chunkedEncoding = chunkedEncoding;
this.checksumValidation = checksumValidation;
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
index 67e869ab992..deae79113d8 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java
@@ -19,15 +19,20 @@
package org.apache.flink.fs.s3native;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.StringUtils;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
/**
* Configuration for S3 server-side encryption (SSE).
@@ -76,8 +81,9 @@ public class S3EncryptionConfig implements Serializable {
private S3EncryptionConfig(
EncryptionType encryptionType,
@Nullable String kmsKeyId,
- Map<String, String> encryptionContext) {
- this.encryptionType = encryptionType;
+ @Nullable Map<String, String> encryptionContext) {
+ this.encryptionType =
+ Objects.requireNonNull(encryptionType, "encryptionType must
not be null");
this.kmsKeyId = kmsKeyId;
this.encryptionContext =
encryptionContext != null
@@ -96,74 +102,49 @@ public class S3EncryptionConfig implements Serializable {
}
/**
- * Creates a config for SSE-KMS encryption with the default KMS key.
- *
- * <p>Uses the AWS-managed KMS key (aws/s3) for the S3 bucket.
- */
- public static S3EncryptionConfig sseKms() {
- return new S3EncryptionConfig(EncryptionType.SSE_KMS, null);
- }
-
- /**
- * Creates a config for SSE-KMS encryption with a specific KMS key.
- *
- * @param kmsKeyId The KMS key ID, ARN, or alias (e.g.,
"arn:aws:kms:region:account:key/key-id"
- * or "alias/my-key")
- */
- public static S3EncryptionConfig sseKms(String kmsKeyId) {
- return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId);
- }
-
- /**
- * Creates a config for SSE-KMS encryption with a specific KMS key and
encryption context.
+ * Creates a config for SSE-KMS encryption.
*
- * <p>The encryption context is a set of key-value pairs that:
- *
- * <ul>
- * <li>Provides additional authenticated data (AAD) for encryption
- * <li>Can be used in IAM policy conditions for fine-grained access
control
- * <li>Is logged in AWS CloudTrail for auditing
- * </ul>
- *
- * <p>Example: You might include context like {"department": "finance",
"project": "budget"} to
- * restrict which principals can encrypt/decrypt based on these values.
- *
- * @param kmsKeyId The KMS key ID, ARN, or alias
- * @param encryptionContext The encryption context key-value pairs
+ * @param kmsKeyId The KMS key ID, ARN, or alias; null uses the
AWS-managed default key
+ * @param encryptionContext Optional key-value pairs for IAM policy
conditions and CloudTrail
+ * auditing; null or empty means no context
* @see <a
href="https://docs.aws.amazon.com/kms/latest/developerguide/encrypt_context.html">AWS
* KMS Encryption Context</a>
*/
public static S3EncryptionConfig sseKms(
- String kmsKeyId, Map<String, String> encryptionContext) {
- return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId,
encryptionContext);
+ @Nullable String kmsKeyId, @Nullable Map<String, String>
encryptionContext) {
+ return new S3EncryptionConfig(
+ EncryptionType.SSE_KMS,
+ StringUtils.isNullOrWhitespaceOnly(kmsKeyId) ? null : kmsKeyId,
+ encryptionContext);
}
/**
* Creates an encryption config from configuration strings.
*
- * @param encryptionTypeStr The encryption type: "none", "sse-s3",
"sse-kms", or "SSE_S3",
- * "SSE_KMS"
+ * @param encryptionTypeStr The encryption type: "none", "sse-s3",
"sse-kms", "aws:kms",
+ * "aes256" (case-insensitive)
* @param kmsKeyId The KMS key ID (required for SSE-KMS, ignored for other
types)
* @return The encryption configuration
* @throws IllegalArgumentException if the encryption type is invalid
*/
public static S3EncryptionConfig fromConfig(
- @Nullable String encryptionTypeStr, @Nullable String kmsKeyId) {
- if (encryptionTypeStr == null
- || encryptionTypeStr.isEmpty()
+ @Nullable String encryptionTypeStr,
+ @Nullable String kmsKeyId,
+ Map<String, String> encryptionContext) {
+ if (StringUtils.isNullOrWhitespaceOnly(encryptionTypeStr)
|| "none".equalsIgnoreCase(encryptionTypeStr)) {
return none();
}
- String normalizedType = encryptionTypeStr.toUpperCase().replace("-",
"_").replace(":", "_");
+ String normalizedType = encryptionTypeStr.toLowerCase(Locale.ROOT);
switch (normalizedType) {
- case "SSE_S3":
- case "AES256":
+ case "sse-s3":
+ case "aes256":
return sseS3();
- case "SSE_KMS":
- case "AWS_KMS":
- return kmsKeyId != null && !kmsKeyId.isEmpty() ?
sseKms(kmsKeyId) : sseKms();
+ case "sse-kms":
+ case "aws:kms":
+ return sseKms(kmsKeyId, encryptionContext);
default:
throw new IllegalArgumentException(
"Unknown encryption type: "
@@ -220,6 +201,28 @@ public class S3EncryptionConfig implements Serializable {
}
}
+ public String serializeEncryptionContext() {
+ StringBuilder json = new StringBuilder("{");
+ boolean first = true;
+ for (Map.Entry<String, String> entry : encryptionContext.entrySet()) {
+ if (!first) {
+ json.append(",");
+ }
+ json.append("\"")
+ .append(escapeJson(entry.getKey()))
+ .append("\":\"")
+ .append(escapeJson(entry.getValue()))
+ .append("\"");
+ first = false;
+ }
+ json.append("}");
+ return
Base64.getEncoder().encodeToString(json.toString().getBytes(StandardCharsets.UTF_8));
+ }
+
+ private String escapeJson(String value) {
+ return value.replace("\\", "\\\\").replace("\"", "\\\"");
+ }
+
@Override
public String toString() {
StringBuilder sb = new
StringBuilder("S3EncryptionConfig{type=").append(encryptionType);
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
index 01bc660f3ac..14ff1eeab12 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3ObjectOperations.java
@@ -85,7 +85,6 @@ import java.util.stream.Collectors;
* <ul>
* <li>SSE-C (customer-provided keys) via a KeyProvider interface
* <li>Client-side encryption via an EncryptionHandler interface
- * <li>Encryption context for SSE-KMS (see HADOOP-19197)
* </ul>
*
* <p><b>S3 URI Handling:</b> The {@link #extractKey(Path)} and {@link
#extractBucketName(Path)}
@@ -155,7 +154,7 @@ public class NativeS3ObjectOperations {
}
if (encryptionConfig.hasEncryptionContext()) {
requestBuilder.ssekmsEncryptionContext(
-
serializeEncryptionContext(encryptionConfig.getEncryptionContext()));
+ encryptionConfig.serializeEncryptionContext());
}
}
}
@@ -171,36 +170,11 @@ public class NativeS3ObjectOperations {
}
if (encryptionConfig.hasEncryptionContext()) {
requestBuilder.ssekmsEncryptionContext(
-
serializeEncryptionContext(encryptionConfig.getEncryptionContext()));
+ encryptionConfig.serializeEncryptionContext());
}
}
}
- /**
- * Serializes the encryption context map to a Base64-encoded JSON string
as required by S3 API.
- */
- private String serializeEncryptionContext(java.util.Map<String, String>
context) {
- StringBuilder json = new StringBuilder("{");
- boolean first = true;
- for (java.util.Map.Entry<String, String> entry : context.entrySet()) {
- if (!first) {
- json.append(",");
- }
- json.append("\"")
- .append(escapeJson(entry.getKey()))
- .append("\":\"")
- .append(escapeJson(entry.getValue()))
- .append("\"");
- first = false;
- }
- json.append("}");
- return
java.util.Base64.getEncoder().encodeToString(json.toString().getBytes());
- }
-
- private String escapeJson(String value) {
- return value.replace("\\", "\\\\").replace("\"", "\\\"");
- }
-
public UploadPartResult uploadPart(
String key, String uploadId, int partNumber, File inputFile, long
length)
throws IOException {
@@ -275,9 +249,8 @@ public class NativeS3ObjectOperations {
}
if
(encryptionConfig.hasEncryptionContext()) {
req.ssekmsEncryptionContext(
-
serializeEncryptionContext(
-
encryptionConfig
-
.getEncryptionContext()));
+ encryptionConfig
+
.serializeEncryptionContext());
}
}
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java
new file mode 100644
index 00000000000..3564a3a3015
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BlockLocationTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3BlockLocation}. */
+class S3BlockLocationTest {
+
+ @Test
+ void testAccessors() {
+ S3BlockLocation loc = new S3BlockLocation(new String[] {"localhost"},
100L, 512L);
+
+ assertThat(loc.getHosts()).containsExactly("localhost");
+ assertThat(loc.getOffset()).isEqualTo(100L);
+ assertThat(loc.getLength()).isEqualTo(512L);
+ }
+
+ @Test
+ void testCompareToOrdersByOffset() {
+ S3BlockLocation first = new S3BlockLocation(new String[]
{"localhost"}, 0L, 100L);
+ S3BlockLocation second = new S3BlockLocation(new String[]
{"localhost"}, 100L, 100L);
+
+ assertThat(first.compareTo(second)).isNegative();
+ assertThat(second.compareTo(first)).isPositive();
+ assertThat(first.compareTo(first)).isZero();
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java
new file mode 100644
index 00000000000..cfda10a1452
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3EncryptionConfigTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.NONE;
+import static
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_KMS;
+import static
org.apache.flink.fs.s3native.S3EncryptionConfig.EncryptionType.SSE_S3;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link S3EncryptionConfig}. */
+class S3EncryptionConfigTest {
+
+ @Test
+ void sseKms_withContext_contextStoredDefensively() {
+ Map<String, String> ctx =
+ new HashMap<>(Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"));
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", ctx);
+ ctx.put("extra", "value");
+
+ assertThat(c.getEncryptionContext())
+ .isEqualTo(Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"));
+ assertThat(c.hasEncryptionContext()).isTrue();
+ }
+
+ @Test
+ void sseKms_nullKeyId_keyIdIsNull() {
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms(null,
Collections.emptyMap());
+
+ assertThat(c.getKmsKeyId()).isNull();
+ }
+
+ @Test
+ void sseKms_returnedContext_isUnmodifiable() {
+ S3EncryptionConfig c =
+ S3EncryptionConfig.sseKms(
+ "key-id", Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"));
+
+ assertThatThrownBy(() -> c.getEncryptionContext().put("x", "y"))
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void sseKms_contextAbsent_contextIsEmpty(Map<String, String> context) {
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms("key-id", context);
+
+ assertThat(c.getEncryptionContext()).isEmpty();
+ assertThat(c.hasEncryptionContext()).isFalse();
+ }
+
+ static Stream<Arguments> sseKms_contextAbsent_contextIsEmpty() {
+ return Stream.of(Arguments.of((Object) null),
Arguments.of(Collections.emptyMap()));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void getServerSideEncryption_allTypes_returnsCorrectSseValue(
+ String configType, ServerSideEncryption expected) {
+ S3EncryptionConfig c =
+ S3EncryptionConfig.fromConfig(configType, null,
Collections.emptyMap());
+
+ assertThat(c.getServerSideEncryption()).isEqualTo(expected);
+ }
+
+ static Stream<Arguments>
getServerSideEncryption_allTypes_returnsCorrectSseValue() {
+ return Stream.of(
+ Arguments.of(null, null),
+ Arguments.of("sse-s3", ServerSideEncryption.AES256),
+ Arguments.of("sse-kms", ServerSideEncryption.AWS_KMS));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void fromConfig_typeVariants_returnExpectedType(
+ String input, S3EncryptionConfig.EncryptionType expected) {
+ assertThat(
+ S3EncryptionConfig.fromConfig(input, null,
Collections.emptyMap())
+ .getEncryptionType())
+ .isEqualTo(expected);
+ }
+
+ static Stream<Arguments> fromConfig_typeVariants_returnExpectedType() {
+ return Stream.of(
+ Arguments.of(null, NONE),
+ Arguments.of("", NONE),
+ Arguments.of("none", NONE),
+ Arguments.of("NONE", NONE),
+ Arguments.of(" ", NONE),
+ Arguments.of("sse-s3", SSE_S3),
+ Arguments.of("AES256", SSE_S3),
+ Arguments.of("sse-kms", SSE_KMS),
+ Arguments.of("aws:kms", SSE_KMS));
+ }
+
+ @Test
+ void fromConfig_sseKmsWithKeyAndContext_keyAndContextPreserved() {
+ S3EncryptionConfig result =
+ S3EncryptionConfig.fromConfig(
+ "sse-kms",
+ "my-key",
+ Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"));
+
+ assertThat(result.getKmsKeyId()).isEqualTo("my-key");
+ assertThat(result.getEncryptionContext())
+ .isEqualTo(Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void fromConfig_sseKmsWithNoKeyId_keyIdIsNull(String kmsKeyId) {
+ assertThat(
+ S3EncryptionConfig.fromConfig("sse-kms", kmsKeyId,
Collections.emptyMap())
+ .getKmsKeyId())
+ .isNull();
+ }
+
+ static Stream<Arguments> fromConfig_sseKmsWithNoKeyId_keyIdIsNull() {
+ return Stream.of(Arguments.of((Object) null), Arguments.of(""));
+ }
+
+ @Test
+ void fromConfig_sseKmsDefaultKeyWithContext_contextPreserved() {
+ assertThat(
+ S3EncryptionConfig.fromConfig(
+ "sse-kms",
+ null,
+ Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"))
+ .hasEncryptionContext())
+ .isTrue();
+ }
+
+ @Test
+ void fromConfig_sseS3WithContext_contextIgnored() {
+ S3EncryptionConfig c =
+ S3EncryptionConfig.fromConfig(
+ "sse-s3", null, Map.of("aws:s3:arn",
"arn:aws:s3:::my-bucket/my-file"));
+
+ assertThat(c.getEncryptionType()).isEqualTo(SSE_S3);
+ assertThat(c.getEncryptionContext()).isEmpty();
+ }
+
+ @Test
+ void fromConfig_sseS3_kmsKeyIdIgnored() {
+ S3EncryptionConfig c =
+ S3EncryptionConfig.fromConfig("sse-s3", "some-key",
Collections.emptyMap());
+
+ assertThat(c.getKmsKeyId()).isNull();
+ }
+
+ @Test
+ void fromConfig_unknownType_throwsIllegalArgument() {
+ assertThatThrownBy(
+ () ->
+ S3EncryptionConfig.fromConfig(
+ "invalid-type", null,
Collections.emptyMap()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("invalid-type");
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void isEnabled_encryptionType_returnsCorrectState(S3EncryptionConfig
config, boolean expected) {
+ assertThat(config.isEnabled()).isEqualTo(expected);
+ }
+
+ static Stream<Arguments> isEnabled_encryptionType_returnsCorrectState() {
+ return Stream.of(
+ Arguments.of(
+ S3EncryptionConfig.fromConfig(null, null,
Collections.emptyMap()), false),
+ Arguments.of(
+ S3EncryptionConfig.fromConfig("sse-s3", null,
Collections.emptyMap()),
+ true),
+ Arguments.of(
+ S3EncryptionConfig.fromConfig("sse-kms", null,
Collections.emptyMap()),
+ true));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void serializeEncryptionContext_exactOutput_correctBase64Json(
+ Map<String, String> context, String expectedDecoded) {
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, context);
+ String decoded = new
String(Base64.getDecoder().decode(c.serializeEncryptionContext()));
+
+ assertThat(decoded).isEqualTo(expectedDecoded);
+ }
+
+ static Stream<Arguments>
serializeEncryptionContext_exactOutput_correctBase64Json() {
+ return Stream.of(
+ Arguments.of(Collections.emptyMap(), "{}"),
+ Arguments.of(Map.of("k", "v"), "{\"k\":\"v\"}"));
+ }
+
+ @Test
+ void serializeEncryptionContext_multipleEntries_allEntriesPresent() {
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of("k1",
"v1", "k2", "v2"));
+ String decoded = new
String(Base64.getDecoder().decode(c.serializeEncryptionContext()));
+
+ assertThat(decoded).contains("\"k1\":\"v1\"", "\"k2\":\"v2\"");
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void serializeEncryptionContext_jsonSpecialChars_escapedCorrectly(
+ String key, String value, String expectedFragment) {
+ S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of(key,
value));
+ String decoded = new
String(Base64.getDecoder().decode(c.serializeEncryptionContext()));
+
+ assertThat(decoded).contains(expectedFragment);
+ }
+
+ static Stream<Arguments>
serializeEncryptionContext_jsonSpecialChars_escapedCorrectly() {
+ return Stream.of(
+ Arguments.of("k", "val\"ue", "\"k\":\"val\\\"ue\""),
+ Arguments.of("k", "val\\ue", "\"k\":\"val\\\\ue\""),
+ Arguments.of("k\"ey", "v", "\"k\\\"ey\":\"v\""),
+ Arguments.of("k\\ey", "v", "\"k\\\\ey\":\"v\""),
+ Arguments.of("k", "\\\"", "\"k\":\"\\\\\\\"\""));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void serialization_roundTrip_preservesAllFields(S3EncryptionConfig config)
throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ new ObjectOutputStream(bos).writeObject(config);
+ S3EncryptionConfig copy =
+ (S3EncryptionConfig)
+ new ObjectInputStream(new
ByteArrayInputStream(bos.toByteArray()))
+ .readObject();
+
+
assertThat(copy.getEncryptionType()).isEqualTo(config.getEncryptionType());
+ assertThat(copy.getKmsKeyId()).isEqualTo(config.getKmsKeyId());
+
assertThat(copy.getEncryptionContext()).isEqualTo(config.getEncryptionContext());
+ }
+
+ static Stream<Arguments> serialization_roundTrip_preservesAllFields() {
+ return Stream.of(
+ Arguments.of(S3EncryptionConfig.sseKms("key-id", Map.of("k",
"v"))),
+ Arguments.of(S3EncryptionConfig.none()),
+ Arguments.of(S3EncryptionConfig.sseS3()),
+ Arguments.of(S3EncryptionConfig.sseKms(null, null)));
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java
new file mode 100644
index 00000000000..c60fd422c6e
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ExceptionUtilsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3ExceptionUtils}. */
+class S3ExceptionUtilsTest {
+
+ static Stream<Arguments> toIoExceptionCases() {
+ return Stream.of(
+ Arguments.of(
+ s3Exception(
+ 404,
+ AwsErrorDetails.builder().errorMessage("key
not found").build()),
+ "delete object",
+ "delete object (HTTP 404: key not found)"),
+ Arguments.of(
+ s3Exception(500, "internal error"),
+ "put object",
+ "put object (HTTP 500: internal error)"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("toIoExceptionCases")
+ void toIOException_contextAndStatusCode_formatsMessageAndPreservesCause(
+ S3Exception cause, String context, String expectedMessage) {
+ IOException result = S3ExceptionUtils.toIOException(context, cause);
+
+ assertThat(result.getMessage()).contains(expectedMessage);
+ assertThat(result.getCause()).isSameAs(cause);
+ }
+
+ static Stream<Arguments> errorMessageExactCases() {
+ return Stream.of(
+ Arguments.of(
+ s3Exception(
+ 404,
+ AwsErrorDetails.builder()
+ .errorMessage("The specified key does
not exist.")
+ .build()),
+ "The specified key does not exist."),
+ Arguments.of(s3ExceptionStatusOnly(500), "Unknown S3 error"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("errorMessageExactCases")
+ void errorMessage_exactReturn_matchesExpected(S3Exception e, String
expected) {
+ assertThat(S3ExceptionUtils.errorMessage(e)).isEqualTo(expected);
+ }
+
+ static Stream<Arguments> errorMessageFallbackCases() {
+ return Stream.of(
+ Arguments.of(s3Exception(500, "connection timeout"),
"connection timeout"),
+ Arguments.of(
+ s3ExceptionWithMessageAndDetails(
+ 500,
+ "fallback message",
+
AwsErrorDetails.builder().errorCode("InternalError").build()),
+ "fallback message"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("errorMessageFallbackCases")
+ void errorMessage_fallbackToExceptionMessage_containsExpected(
+ S3Exception e, String expectedMessage) {
+ assertThat(S3ExceptionUtils.errorMessage(e)).contains(expectedMessage);
+ }
+
+ static Stream<Arguments> errorCodeAllCases() {
+ return Stream.of(
+ Arguments.of(
+ s3Exception(404,
AwsErrorDetails.builder().errorCode("NoSuchKey").build()),
+ "NoSuchKey"),
+ Arguments.of(s3Exception(500, "some error"), "Unknown"),
+ Arguments.of(
+ s3Exception(
+ 500,
+ AwsErrorDetails.builder()
+ .errorMessage("Something went wrong")
+ .build()),
+ "Unknown"));
+ }
+
+ @ParameterizedTest
+ @MethodSource("errorCodeAllCases")
+ void errorCode_allCases_returnsExpected(S3Exception e, String expected) {
+ assertThat(S3ExceptionUtils.errorCode(e)).isEqualTo(expected);
+ }
+
+ private static AwsServiceException s3Exception(int statusCode,
AwsErrorDetails details) {
+ return
S3Exception.builder().statusCode(statusCode).awsErrorDetails(details).build();
+ }
+
+ private static AwsServiceException s3Exception(int statusCode, String
message) {
+ return
S3Exception.builder().statusCode(statusCode).message(message).build();
+ }
+
+ private static AwsServiceException s3ExceptionWithMessageAndDetails(
+ int statusCode, String message, AwsErrorDetails details) {
+ return S3Exception.builder()
+ .statusCode(statusCode)
+ .message(message)
+ .awsErrorDetails(details)
+ .build();
+ }
+
+ private static AwsServiceException s3ExceptionStatusOnly(int statusCode) {
+ return S3Exception.builder().statusCode(statusCode).build();
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java
new file mode 100644
index 00000000000..aab96258989
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3FileStatusTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link S3FileStatus}. */
+class S3FileStatusTest {
+
+ private static final Path PATH = new Path("s3://bucket/key");
+
+ static Stream<Arguments> fileSizeAndModTime() {
+ return Stream.of(
+ Arguments.of(0L, 0L),
+ Arguments.of(1024L, 987654321L),
+ Arguments.of(Long.MAX_VALUE, Long.MAX_VALUE));
+ }
+
+ @ParameterizedTest
+ @MethodSource("fileSizeAndModTime")
+ void withFile_variousSizes_allFieldsSetCorrectly(long size, long modTime) {
+ S3FileStatus s = S3FileStatus.withFile(size, modTime, PATH);
+
+ assertThat(s.getLen()).isEqualTo(size);
+ assertThat(s.getBlockSize()).isEqualTo(size);
+ assertThat(s.getModificationTime()).isEqualTo(modTime);
+ assertThat(s.getAccessTime()).isEqualTo(0L);
+ assertThat(s.isDir()).isFalse();
+ assertThat(s.getPath()).isEqualTo(PATH);
+ assertThat(s.getReplication()).isEqualTo((short) 1);
+ }
+
+ @Test
+ void withDirectory_anyPath_allFieldsSetCorrectly() {
+ S3FileStatus s = S3FileStatus.withDirectory(PATH);
+
+ assertThat(s.getLen()).isEqualTo(0L);
+ assertThat(s.getBlockSize()).isEqualTo(0L);
+ assertThat(s.getModificationTime()).isEqualTo(0L);
+ assertThat(s.getAccessTime()).isEqualTo(0L);
+ assertThat(s.isDir()).isTrue();
+ assertThat(s.getPath()).isEqualTo(PATH);
+ assertThat(s.getReplication()).isEqualTo((short) 1);
+ }
+
+ @Test
+ void constructor_nonZeroAccessTime_accessTimeIsPreserved() {
+ S3FileStatus s = new S3FileStatus(1024L, 1024L, 100L, 999L, false,
PATH);
+
+ assertThat(s.getAccessTime()).isEqualTo(999L);
+ }
+}