This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4850b622c7 AWS: Support S3 directory bucket listing (#11021)
4850b622c7 is described below
commit 4850b622c778deb4b234880bfd7643070e0a5458
Author: stubz151 <[email protected]>
AuthorDate: Thu Oct 24 04:45:00 2024 +0100
AWS: Support S3 directory bucket listing (#11021)
---
.../org/apache/iceberg/aws/AwsIntegTestUtil.java | 47 +++++++++-
.../org/apache/iceberg/aws/glue/GlueTestBase.java | 2 +-
.../iceberg/aws/s3/TestS3FileIOIntegration.java | 73 +++++++++++----
.../iceberg/aws/s3/TestS3MultipartUpload.java | 2 +-
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 8 +-
.../apache/iceberg/aws/s3/S3FileIOProperties.java | 37 ++++++++
.../main/java/org/apache/iceberg/aws/s3/S3URI.java | 33 +++++++
.../apache/iceberg/aws/TestS3FileIOProperties.java | 8 ++
.../org/apache/iceberg/aws/s3/TestS3FileIO.java | 103 ++++++++++++++++++++-
.../java/org/apache/iceberg/aws/s3/TestS3URI.java | 43 +++++++++
10 files changed, 333 insertions(+), 23 deletions(-)
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
index e9cf474add..6b57cfd682 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -32,9 +32,11 @@ import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.ObjectVersion;
import
software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
+import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3control.S3ControlClient;
import
software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest;
import
software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
@@ -42,6 +44,7 @@ import
software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
public class AwsIntegTestUtil {
private static final Logger LOG =
LoggerFactory.getLogger(AwsIntegTestUtil.class);
+ private static final int BATCH_DELETION_SIZE = 1000;
private AwsIntegTestUtil() {}
@@ -106,17 +109,16 @@ public class AwsIntegTestUtil {
return System.getenv("AWS_TEST_MULTI_REGION_ACCESS_POINT_ALIAS");
}
- public static void cleanS3Bucket(S3Client s3, String bucketName, String
prefix) {
+ public static void cleanS3GeneralPurposeBucket(S3Client s3, String
bucketName, String prefix) {
ListObjectVersionsIterable response =
s3.listObjectVersionsPaginator(
ListObjectVersionsRequest.builder().bucket(bucketName).prefix(prefix).build());
List<ObjectVersion> versionsToDelete = Lists.newArrayList();
- int batchDeletionSize = 1000;
response.versions().stream()
.forEach(
version -> {
versionsToDelete.add(version);
- if (versionsToDelete.size() == batchDeletionSize) {
+ if (versionsToDelete.size() == BATCH_DELETION_SIZE) {
deleteObjectVersions(s3, bucketName, versionsToDelete);
versionsToDelete.clear();
}
@@ -127,6 +129,45 @@ public class AwsIntegTestUtil {
}
}
+ /**
+ * Method used to clean up a S3 directory bucket which doesn't care about
versions
+ *
+ * @param s3 an instance of S3Client to be used to list/delete objects
+ * @param bucketName name of the bucket
+ * @param prefix the path prefix we want to remove
+ */
+ public static void cleanS3DirectoryBucket(S3Client s3, String bucketName,
String prefix) {
+ String newPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
+ ListObjectsV2Request listRequest =
+
ListObjectsV2Request.builder().bucket(bucketName).prefix(newPrefix).build();
+
+ ListObjectsV2Iterable paginatedListResponse =
s3.listObjectsV2Paginator(listRequest);
+ List<ObjectIdentifier> objectsToDelete = Lists.newArrayList();
+
+ paginatedListResponse.contents().stream()
+ .forEach(
+ s3Object -> {
+ if (objectsToDelete.size() == BATCH_DELETION_SIZE) {
+ deleteObjects(s3, bucketName, objectsToDelete);
+ objectsToDelete.clear();
+ }
+
objectsToDelete.add(ObjectIdentifier.builder().key(s3Object.key()).build());
+ });
+
+ if (!objectsToDelete.isEmpty()) {
+ deleteObjects(s3, bucketName, objectsToDelete);
+ }
+ }
+
+ private static void deleteObjects(
+ S3Client s3, String bucketName, List<ObjectIdentifier> objectsToDelete) {
+ s3.deleteObjects(
+ DeleteObjectsRequest.builder()
+ .bucket(bucketName)
+ .delete(Delete.builder().objects(objectsToDelete).build())
+ .build());
+ }
+
private static void deleteObjectVersions(
S3Client s3, String bucket, List<ObjectVersion> objectVersions) {
s3.deleteObjects(
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
index 29076369c8..65e37eba4c 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -110,7 +110,7 @@ public class GlueTestBase {
@AfterAll
public static void afterClass() {
AwsIntegTestUtil.cleanGlueCatalog(GLUE, NAMESPACES);
- AwsIntegTestUtil.cleanS3Bucket(S3, TEST_BUCKET_NAME, TEST_PATH_PREFIX);
+ AwsIntegTestUtil.cleanS3GeneralPurposeBucket(S3, TEST_BUCKET_NAME,
TEST_PATH_PREFIX);
}
public static String getRandomName() {
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
index 41a07401a1..9d5d41438a 100644
---
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
+++
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
@@ -108,25 +108,32 @@ public class TestS3FileIOIntegration {
content = new String(contentBytes, StandardCharsets.UTF_8);
kmsKeyArn = kms.createKey().keyMetadata().arn();
- AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName);
- AwsIntegTestUtil.createAccessPoint(
- crossRegionS3Control, crossRegionAccessPointName,
crossRegionBucketName);
- multiRegionAccessPointAlias =
AwsIntegTestUtil.testMultiRegionAccessPointAlias();
- s3.putBucketVersioning(
- PutBucketVersioningRequest.builder()
- .bucket(bucketName)
- .versioningConfiguration(
-
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
- .build());
+ if (!S3URI.isS3DirectoryBucket(bucketName)) {
+ s3.putBucketVersioning(
+ PutBucketVersioningRequest.builder()
+ .bucket(bucketName)
+ .versioningConfiguration(
+
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
+ .build());
+ AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName,
bucketName);
+ AwsIntegTestUtil.createAccessPoint(
+ crossRegionS3Control, crossRegionAccessPointName,
crossRegionBucketName);
+ multiRegionAccessPointAlias =
AwsIntegTestUtil.testMultiRegionAccessPointAlias();
+ }
}
@AfterAll
public static void afterClass() {
- AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
- AwsIntegTestUtil.deleteAccessPoint(s3Control, accessPointName);
- AwsIntegTestUtil.deleteAccessPoint(crossRegionS3Control,
crossRegionAccessPointName);
- kms.scheduleKeyDeletion(
-
ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build());
+ if (S3URI.isS3DirectoryBucket(bucketName)) {
+ S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
+ AwsIntegTestUtil.cleanS3DirectoryBucket(s3FileIO.client(), bucketName,
prefix);
+ } else {
+ AwsIntegTestUtil.cleanS3GeneralPurposeBucket(s3, bucketName, prefix);
+ AwsIntegTestUtil.deleteAccessPoint(s3Control, accessPointName);
+ AwsIntegTestUtil.deleteAccessPoint(crossRegionS3Control,
crossRegionAccessPointName);
+ kms.scheduleKeyDeletion(
+
ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build());
+ }
}
@BeforeEach
@@ -171,6 +178,7 @@ public class TestS3FileIOIntegration {
@Test
public void testNewInputStreamWithAccessPoint() throws Exception {
+ requireAccessPointSupport();
s3.putObject(
PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
RequestBody.fromBytes(contentBytes));
@@ -201,12 +209,14 @@ public class TestS3FileIOIntegration {
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
validateRead(s3FileIO, crossBucketObjectUri);
} finally {
- AwsIntegTestUtil.cleanS3Bucket(s3Client, crossRegionBucketName,
crossBucketObjectKey);
+ AwsIntegTestUtil.cleanS3GeneralPurposeBucket(
+ s3Client, crossRegionBucketName, crossBucketObjectKey);
}
}
@Test
public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception {
+ requireAccessPointSupport();
clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED,
"true"));
S3Client s3Client = clientFactory.s3();
s3Client.putObject(
@@ -258,6 +268,7 @@ public class TestS3FileIOIntegration {
@Test
public void testNewOutputStreamWithAccessPoint() throws Exception {
+ requireAccessPointSupport();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
s3FileIO.initialize(
ImmutableMap.of(
@@ -273,6 +284,7 @@ public class TestS3FileIOIntegration {
@Test
public void testNewOutputStreamWithCrossRegionAccessPoint() throws Exception
{
+ requireAccessPointSupport();
clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED,
"true"));
S3Client s3Client = clientFactory.s3();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
@@ -327,6 +339,7 @@ public class TestS3FileIOIntegration {
@Test
public void testServerSideKmsEncryption() throws Exception {
+ requireKMSEncryptionSupport();
S3FileIOProperties properties = new S3FileIOProperties();
properties.setSseType(S3FileIOProperties.SSE_TYPE_KMS);
properties.setSseKey(kmsKeyArn);
@@ -342,6 +355,7 @@ public class TestS3FileIOIntegration {
@Test
public void testServerSideKmsEncryptionWithDefaultKey() throws Exception {
+ requireKMSEncryptionSupport();
S3FileIOProperties properties = new S3FileIOProperties();
properties.setSseType(S3FileIOProperties.SSE_TYPE_KMS);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
@@ -363,6 +377,7 @@ public class TestS3FileIOIntegration {
@Test
public void testDualLayerServerSideKmsEncryption() throws Exception {
+ requireKMSEncryptionSupport();
S3FileIOProperties properties = new S3FileIOProperties();
properties.setSseType(S3FileIOProperties.DSSE_TYPE_KMS);
properties.setSseKey(kmsKeyArn);
@@ -378,6 +393,7 @@ public class TestS3FileIOIntegration {
@Test
public void testServerSideCustomEncryption() throws Exception {
+ requireKMSEncryptionSupport();
// generate key
KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
keyGenerator.init(256, new SecureRandom());
@@ -413,6 +429,7 @@ public class TestS3FileIOIntegration {
@Test
public void testACL() throws Exception {
+ requireACLSupport();
S3FileIOProperties properties = new S3FileIOProperties();
properties.setAcl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
@@ -444,6 +461,7 @@ public class TestS3FileIOIntegration {
@Test
public void testDeleteFilesMultipleBatchesWithAccessPoints() throws
Exception {
+ requireAccessPointSupport();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3,
getDeletionTestProperties());
s3FileIO.initialize(
ImmutableMap.of(
@@ -454,6 +472,7 @@ public class TestS3FileIOIntegration {
@Test
public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints()
throws Exception {
+ requireKMSEncryptionSupport();
clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED,
"true"));
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3,
getDeletionTestProperties());
s3FileIO.initialize(
@@ -515,6 +534,7 @@ public class TestS3FileIOIntegration {
@Test
public void testFileRecoveryHappyPath() throws Exception {
+ requireVersioningSupport();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new
S3FileIOProperties());
String filePath = String.format("s3://%s/%s/%s", bucketName, prefix,
"someFile.parquet");
write(s3FileIO, filePath);
@@ -527,6 +547,7 @@ public class TestS3FileIOIntegration {
@Test
public void testFileRecoveryFailsToRecover() throws Exception {
+ requireVersioningSupport();
S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new
S3FileIOProperties());
s3.putBucketVersioning(
PutBucketVersioningRequest.builder()
@@ -613,4 +634,24 @@ public class TestS3FileIOIntegration {
builder -> builder.bucket(s3URI.bucket()).key(s3URI.key()
+ i).build(),
RequestBody.empty()));
}
+
+ /** S3 Express doesn't support access points */
+ private void requireAccessPointSupport() {
+ Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+ }
+
+ /** S3 Express doesn’t support KMS/custom encryption */
+ private void requireKMSEncryptionSupport() {
+ Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+ }
+
+ /** S3 Express doesn't support versioning */
+ private void requireVersioningSupport() {
+ Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+ }
+
+ /** File ACLs aren’t supported by S3 Express */
+ private void requireACLSupport() {
+ Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+ }
}
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
index 29d4c48927..901e9933b1 100644
---
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
+++
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
@@ -59,7 +59,7 @@ public class TestS3MultipartUpload {
@AfterAll
public static void afterClass() {
- AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
+ AwsIntegTestUtil.cleanS3GeneralPurposeBucket(s3, bucketName, prefix);
}
@BeforeEach
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 11a5ce0224..23b246c357 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -297,7 +297,13 @@ public class S3FileIO implements CredentialSupplier,
DelegateFileIO, SupportsRec
@Override
public Iterable<FileInfo> listPrefix(String prefix) {
- S3URI s3uri = new S3URI(prefix,
s3FileIOProperties.bucketToAccessPointMapping());
+ S3URI uri = new S3URI(prefix,
s3FileIOProperties.bucketToAccessPointMapping());
+ if (uri.useS3DirectoryBucket()
+ && s3FileIOProperties.isS3DirectoryBucketListPrefixAsDirectory()) {
+ uri = uri.toDirectoryPath();
+ }
+
+ S3URI s3uri = uri;
ListObjectsV2Request request =
ListObjectsV2Request.builder().bucket(s3uri.bucket()).prefix(s3uri.key()).build();
diff --git
a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
index 3a43880f31..5da758704a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
@@ -428,6 +428,25 @@ public class S3FileIOProperties implements Serializable {
public static final long S3_RETRY_MAX_WAIT_MS_DEFAULT = 20_000; // 20 seconds
+ /**
+ * Controls whether to list prefixes as directories for S3 Directory buckets
Defaults value is
+ * true, where it will add the "/"
+ *
+ * <p>Example: s3://bucket/prefix will be shown as s3://bucket/prefix/
+ *
+ * <p>For more details see delimiter section in:
+ *
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
+ *
+ * <p>If set to false, this will throw an error when the "/" is not provided
for directory bucket.
+ * Turn off this feature if you are using S3FileIO.listPrefix for listing
bucket prefixes that are
+ * not directories. This would ensure correctness and fail the operation
based on S3 requirement
+ * when listing against a non-directory prefix in a directory bucket.
+ */
+ public static final String S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY =
+ "s3.directory-bucket.list-prefix-as-directory";
+
+ public static final boolean
S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true;
+
private String sseType;
private String sseKey;
private String sseMd5;
@@ -462,6 +481,8 @@ public class S3FileIOProperties implements Serializable {
private int s3RetryNumRetries;
private long s3RetryMinWaitMs;
private long s3RetryMaxWaitMs;
+
+ private boolean s3DirectoryBucketListPrefixAsDirectory;
private final Map<String, String> allProperties;
public S3FileIOProperties() {
@@ -498,6 +519,8 @@ public class S3FileIOProperties implements Serializable {
this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT;
this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT;
this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
+ this.s3DirectoryBucketListPrefixAsDirectory =
+ S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT;
this.allProperties = Maps.newHashMap();
ValidationException.check(
@@ -605,6 +628,11 @@ public class S3FileIOProperties implements Serializable {
PropertyUtil.propertyAsLong(properties, S3_RETRY_MIN_WAIT_MS,
S3_RETRY_MIN_WAIT_MS_DEFAULT);
this.s3RetryMaxWaitMs =
PropertyUtil.propertyAsLong(properties, S3_RETRY_MAX_WAIT_MS,
S3_RETRY_MAX_WAIT_MS_DEFAULT);
+ this.s3DirectoryBucketListPrefixAsDirectory =
+ PropertyUtil.propertyAsBoolean(
+ properties,
+ S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY,
+ S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT);
ValidationException.check(
keyIdAccessKeyBothConfigured(),
@@ -837,6 +865,15 @@ public class S3FileIOProperties implements Serializable {
return (long) s3RetryNumRetries() * s3RetryMaxWaitMs();
}
+ public boolean isS3DirectoryBucketListPrefixAsDirectory() {
+ return s3DirectoryBucketListPrefixAsDirectory;
+ }
+
+ public void setS3DirectoryBucketListPrefixAsDirectory(
+ boolean s3DirectoryBucketListPrefixAsDirectory) {
+ this.s3DirectoryBucketListPrefixAsDirectory =
s3DirectoryBucketListPrefixAsDirectory;
+ }
+
private boolean keyIdAccessKeyBothConfigured() {
return (accessKeyId == null) == (secretAccessKey == null);
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
index 79b4e695de..9cfba5fca3 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
@@ -37,6 +37,9 @@ class S3URI {
private static final String QUERY_DELIM = "\\?";
private static final String FRAGMENT_DELIM = "#";
+ /** Suffix of S3Express storage bucket names. */
+ private static final String S3_DIRECTORY_BUCKET_SUFFIX = "--x-s3";
+
private final String location;
private final String scheme;
private final String bucket;
@@ -115,4 +118,34 @@ class S3URI {
public String toString() {
return location;
}
+
+ /**
+ * Converts the current S3URI to a directory path.
+ *
+ * <p>This method ensures that the S3URI represents a directory by adding a
"/" delimiter at the
+ * end of the prefix if it's not already present.
+ *
+ * @return a S3URI with the directory path configured
+ */
+ public S3URI toDirectoryPath() {
+ if (key.endsWith(PATH_DELIM)) {
+ return this;
+ }
+ return new S3URI(String.format("%s://%s/%s/", scheme, bucket, key));
+ }
+
+ public boolean useS3DirectoryBucket() {
+ return isS3DirectoryBucket(this.bucket);
+ }
+
+ /**
+ * Check if the bucket name indicates the bucket is a directory bucket. This
method does not check
+ * against the S3 service.
+ *
+ * @param bucket bucket to probe.
+ * @return true if the bucket name indicates the bucket is a directory bucket
+ */
+ public static boolean isS3DirectoryBucket(final String bucket) {
+ return bucket.endsWith(S3_DIRECTORY_BUCKET_SUFFIX);
+ }
}
diff --git
a/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
b/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
index e2499e9476..58332d4258 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
@@ -312,4 +312,12 @@ public class TestS3FileIOProperties {
s3Properties.applyS3AccessGrantsConfigurations(builder);
assertThat(builder.plugins().size()).isEqualTo(0);
}
+
+ @Test
+ public void testIsTreatS3DirectoryBucketListPrefixAsDirectoryEnabled() {
+ Map<String, String> map = Maps.newHashMap();
+ map.put(S3FileIOProperties.S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY,
"false");
+ S3FileIOProperties properties = new S3FileIOProperties(map);
+
assertThat(properties.isS3DirectoryBucketListPrefixAsDirectory()).isEqualTo(false);
+ }
}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index 6caa42fb41..77717d7961 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.aws.s3;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
@@ -34,10 +36,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Spliterator;
+import java.util.Spliterators;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
@@ -58,6 +67,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileIOParser;
+import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
@@ -76,6 +86,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.Mockito;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
@@ -86,7 +97,11 @@ import
software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Error;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
@ExtendWith(S3MockExtension.class)
public class TestS3FileIO {
@@ -101,6 +116,9 @@ public class TestS3FileIO {
private final int batchDeletionSize = 5;
private S3FileIO s3FileIO;
+ private static final String S3_GENERAL_PURPOSE_BUCKET = "bucket";
+ private static final String S3_DIRECTORY_BUCKET =
"directory-bucket-usw2-az1--x-s3";
+
private final Map<String, String> properties =
ImmutableMap.of(
"s3.write.tags.tagKey1",
@@ -112,7 +130,7 @@ public class TestS3FileIO {
public void before() {
s3FileIO = new S3FileIO(() -> s3mock);
s3FileIO.initialize(properties);
- createBucket("bucket");
+ createBucket(S3_GENERAL_PURPOSE_BUCKET);
for (int i = 1; i <= numBucketsForBatchDeletion; i++) {
createBucket(batchDeletionBucketPrefix + i);
}
@@ -257,6 +275,89 @@ public class TestS3FileIO {
assertThat(Streams.stream(s3FileIO.listPrefix(prefix)).count()).isEqualTo(totalFiles);
}
+ /**
+ * Tests that we correctly insert the backslash for s3 express buckets.
Currently the Adobe S3
+ * Mock doesn't cater for express buckets eg. When you call createBucket
with s3 express
+ * configurations it still just returns a general bucket TODO Update to use
S3Mock when it behaves
+ * as expected.
+ */
+ @Test
+ public void testPrefixListWithExpressAddSlash() {
+ assertPrefixIsAddedCorrectly("path/to/list", properties);
+
+ Map<String, String> newProperties =
+ ImmutableMap.of(
+ "s3.write.tags.tagKey1",
+ "TagValue1",
+ "s3.delete.batch-size",
+ Integer.toString(batchDeletionSize),
+ "s3.directory-bucket.list-prefix-as-directory",
+ "true");
+ assertPrefixIsAddedCorrectly("path/to/list/", newProperties);
+ }
+
+ public void assertPrefixIsAddedCorrectly(String suffix, Map<String, String>
props) {
+ String prefix = String.format("s3://%s/%s", S3_DIRECTORY_BUCKET, suffix);
+
+ S3Client localMockedClient = mock(S3Client.class);
+
+ List<S3Object> s3Objects =
+ Arrays.asList(
+ S3Object.builder()
+ .key("path/to/list/file1.txt")
+ .size(1024L)
+ .lastModified(Instant.now())
+ .build(),
+ S3Object.builder()
+ .key("path/to/list/file2.txt")
+ .size(2048L)
+ .lastModified(Instant.now().minusSeconds(60))
+ .build());
+
+ ListObjectsV2Response response =
ListObjectsV2Response.builder().contents(s3Objects).build();
+
+ ListObjectsV2Iterable mockedResponse = mock(ListObjectsV2Iterable.class);
+
+ Mockito.when(mockedResponse.stream()).thenReturn(Stream.of(response));
+
+ Mockito.when(
+ localMockedClient.listObjectsV2Paginator(
+ ListObjectsV2Request.builder()
+ .prefix("path/to/list/")
+ .bucket(S3_DIRECTORY_BUCKET)
+ .build()))
+ .thenReturn(mockedResponse);
+
+ // Initialize S3FileIO with the mocked client
+ S3FileIO localS3FileIo = new S3FileIO(() -> localMockedClient);
+ localS3FileIo.initialize(props);
+
+ // Perform the listing
+ List<FileInfo> fileInfoList =
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(
+ localS3FileIo.listPrefix(prefix).iterator(),
Spliterator.ORDERED),
+ false)
+ .collect(Collectors.toList());
+
+ // Assert that the returned FileInfo instances match the expected values
+ assertEquals(2, fileInfoList.size());
+ assertTrue(
+ fileInfoList.stream()
+ .anyMatch(
+ fi ->
+ fi.location().endsWith("file1.txt")
+ && fi.size() == 1024
+ && fi.createdAtMillis() >
Instant.now().minusSeconds(120).toEpochMilli()));
+ assertTrue(
+ fileInfoList.stream()
+ .anyMatch(
+ fi ->
+ fi.location().endsWith("file2.txt")
+ && fi.size() == 2048
+ && fi.createdAtMillis() <
Instant.now().minusSeconds(30).toEpochMilli()));
+ }
+
/**
* Ignoring because the test is flaky, failing with 500s from S3Mock.
Coverage of prefix delete
* exists through integration tests.
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
index 383ff67d16..d3f8ac35d4 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
@@ -28,6 +28,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
public class TestS3URI {
+ private static final String S3_DIRECTORY_BUCKET =
"directory-bucket-usw2-az1--x-s3";
@Test
public void testLocationParsing() {
@@ -96,4 +97,46 @@ public class TestS3URI {
assertThat(uri1.key()).isEqualTo("path/to/file");
assertThat(uri1.toString()).isEqualTo(p1);
}
+
+ @Test
+ public void testS3URIUseS3DirectoryBucket() {
+ assertThat(
+ new S3URI(String.format("s3://%s/path/to/file",
S3_DIRECTORY_BUCKET))
+ .useS3DirectoryBucket())
+ .isTrue();
+ assertThat(new
S3URI("s3://bucket/path/to/file").useS3DirectoryBucket()).isFalse();
+ assertThat(
+ new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket",
S3_DIRECTORY_BUCKET))
+ .useS3DirectoryBucket())
+ .isTrue();
+ assertThat(
+ new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket",
"bucket2"))
+ .useS3DirectoryBucket())
+ .isFalse();
+ }
+
+ @Test
+ public void testS3URIToDirectoryPath() {
+ assertThat(new
S3URI("s3://bucket/path/to/file").toDirectoryPath().location())
+ .isEqualTo("s3://bucket/path/to/file/");
+ assertThat(new
S3URI("s3://bucket/path/to/file/").toDirectoryPath().location())
+ .isEqualTo("s3://bucket/path/to/file/");
+ assertThat(new
S3URI("s3a://bucket/path/to/file").toDirectoryPath().location())
+ .isEqualTo("s3a://bucket/path/to/file/");
+ assertThat(
+ new S3URI(String.format("s3://%s/path/to/file",
S3_DIRECTORY_BUCKET))
+ .toDirectoryPath()
+ .location())
+ .isEqualTo(String.format("s3://%s/path/to/file/",
S3_DIRECTORY_BUCKET));
+ assertThat(
+ new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket",
S3_DIRECTORY_BUCKET))
+ .toDirectoryPath()
+ .location())
+ .isEqualTo(String.format("s3://%s/path/to/file/",
S3_DIRECTORY_BUCKET));
+ assertThat(
+ new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket",
"bucket2"))
+ .toDirectoryPath()
+ .location())
+ .isEqualTo("s3://bucket2/path/to/file/");
+ }
}