This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4850b622c7 AWS: Support S3 directory bucket listing (#11021)
4850b622c7 is described below

commit 4850b622c778deb4b234880bfd7643070e0a5458
Author: stubz151 <[email protected]>
AuthorDate: Thu Oct 24 04:45:00 2024 +0100

    AWS: Support S3 directory bucket listing (#11021)
---
 .../org/apache/iceberg/aws/AwsIntegTestUtil.java   |  47 +++++++++-
 .../org/apache/iceberg/aws/glue/GlueTestBase.java  |   2 +-
 .../iceberg/aws/s3/TestS3FileIOIntegration.java    |  73 +++++++++++----
 .../iceberg/aws/s3/TestS3MultipartUpload.java      |   2 +-
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |   8 +-
 .../apache/iceberg/aws/s3/S3FileIOProperties.java  |  37 ++++++++
 .../main/java/org/apache/iceberg/aws/s3/S3URI.java |  33 +++++++
 .../apache/iceberg/aws/TestS3FileIOProperties.java |   8 ++
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    | 103 ++++++++++++++++++++-
 .../java/org/apache/iceberg/aws/s3/TestS3URI.java  |  43 +++++++++
 10 files changed, 333 insertions(+), 23 deletions(-)

diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java 
b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
index e9cf474add..6b57cfd682 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -32,9 +32,11 @@ import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
 import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 import software.amazon.awssdk.services.s3.model.ObjectVersion;
 import 
software.amazon.awssdk.services.s3.paginators.ListObjectVersionsIterable;
+import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
 import software.amazon.awssdk.services.s3control.S3ControlClient;
 import 
software.amazon.awssdk.services.s3control.model.CreateAccessPointRequest;
 import 
software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
@@ -42,6 +44,7 @@ import 
software.amazon.awssdk.services.s3control.model.DeleteAccessPointRequest;
 public class AwsIntegTestUtil {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AwsIntegTestUtil.class);
+  private static final int BATCH_DELETION_SIZE = 1000;
 
   private AwsIntegTestUtil() {}
 
@@ -106,17 +109,16 @@ public class AwsIntegTestUtil {
     return System.getenv("AWS_TEST_MULTI_REGION_ACCESS_POINT_ALIAS");
   }
 
-  public static void cleanS3Bucket(S3Client s3, String bucketName, String 
prefix) {
+  public static void cleanS3GeneralPurposeBucket(S3Client s3, String 
bucketName, String prefix) {
     ListObjectVersionsIterable response =
         s3.listObjectVersionsPaginator(
             
ListObjectVersionsRequest.builder().bucket(bucketName).prefix(prefix).build());
     List<ObjectVersion> versionsToDelete = Lists.newArrayList();
-    int batchDeletionSize = 1000;
     response.versions().stream()
         .forEach(
             version -> {
               versionsToDelete.add(version);
-              if (versionsToDelete.size() == batchDeletionSize) {
+              if (versionsToDelete.size() == BATCH_DELETION_SIZE) {
                 deleteObjectVersions(s3, bucketName, versionsToDelete);
                 versionsToDelete.clear();
               }
@@ -127,6 +129,45 @@ public class AwsIntegTestUtil {
     }
   }
 
+  /**
+   * Method used to clean up a S3 directory bucket which doesn't care about 
versions
+   *
+   * @param s3 an instance of S3Client to be used to list/delete objects
+   * @param bucketName name of the bucket
+   * @param prefix the path prefix we want to remove
+   */
+  public static void cleanS3DirectoryBucket(S3Client s3, String bucketName, 
String prefix) {
+    String newPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
+    ListObjectsV2Request listRequest =
+        
ListObjectsV2Request.builder().bucket(bucketName).prefix(newPrefix).build();
+
+    ListObjectsV2Iterable paginatedListResponse = 
s3.listObjectsV2Paginator(listRequest);
+    List<ObjectIdentifier> objectsToDelete = Lists.newArrayList();
+
+    paginatedListResponse.contents().stream()
+        .forEach(
+            s3Object -> {
+              if (objectsToDelete.size() == BATCH_DELETION_SIZE) {
+                deleteObjects(s3, bucketName, objectsToDelete);
+                objectsToDelete.clear();
+              }
+              
objectsToDelete.add(ObjectIdentifier.builder().key(s3Object.key()).build());
+            });
+
+    if (!objectsToDelete.isEmpty()) {
+      deleteObjects(s3, bucketName, objectsToDelete);
+    }
+  }
+
+  private static void deleteObjects(
+      S3Client s3, String bucketName, List<ObjectIdentifier> objectsToDelete) {
+    s3.deleteObjects(
+        DeleteObjectsRequest.builder()
+            .bucket(bucketName)
+            .delete(Delete.builder().objects(objectsToDelete).build())
+            .build());
+  }
+
   private static void deleteObjectVersions(
       S3Client s3, String bucket, List<ObjectVersion> objectVersions) {
     s3.deleteObjects(
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java 
b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
index 29076369c8..65e37eba4c 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -110,7 +110,7 @@ public class GlueTestBase {
   @AfterAll
   public static void afterClass() {
     AwsIntegTestUtil.cleanGlueCatalog(GLUE, NAMESPACES);
-    AwsIntegTestUtil.cleanS3Bucket(S3, TEST_BUCKET_NAME, TEST_PATH_PREFIX);
+    AwsIntegTestUtil.cleanS3GeneralPurposeBucket(S3, TEST_BUCKET_NAME, 
TEST_PATH_PREFIX);
   }
 
   public static String getRandomName() {
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
index 41a07401a1..9d5d41438a 100644
--- 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3FileIOIntegration.java
@@ -108,25 +108,32 @@ public class TestS3FileIOIntegration {
     content = new String(contentBytes, StandardCharsets.UTF_8);
     kmsKeyArn = kms.createKey().keyMetadata().arn();
 
-    AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, bucketName);
-    AwsIntegTestUtil.createAccessPoint(
-        crossRegionS3Control, crossRegionAccessPointName, 
crossRegionBucketName);
-    multiRegionAccessPointAlias = 
AwsIntegTestUtil.testMultiRegionAccessPointAlias();
-    s3.putBucketVersioning(
-        PutBucketVersioningRequest.builder()
-            .bucket(bucketName)
-            .versioningConfiguration(
-                
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
-            .build());
+    if (!S3URI.isS3DirectoryBucket(bucketName)) {
+      s3.putBucketVersioning(
+          PutBucketVersioningRequest.builder()
+              .bucket(bucketName)
+              .versioningConfiguration(
+                  
VersioningConfiguration.builder().status(BucketVersioningStatus.ENABLED).build())
+              .build());
+      AwsIntegTestUtil.createAccessPoint(s3Control, accessPointName, 
bucketName);
+      AwsIntegTestUtil.createAccessPoint(
+          crossRegionS3Control, crossRegionAccessPointName, 
crossRegionBucketName);
+      multiRegionAccessPointAlias = 
AwsIntegTestUtil.testMultiRegionAccessPointAlias();
+    }
   }
 
   @AfterAll
   public static void afterClass() {
-    AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
-    AwsIntegTestUtil.deleteAccessPoint(s3Control, accessPointName);
-    AwsIntegTestUtil.deleteAccessPoint(crossRegionS3Control, 
crossRegionAccessPointName);
-    kms.scheduleKeyDeletion(
-        
ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build());
+    if (S3URI.isS3DirectoryBucket(bucketName)) {
+      S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
+      AwsIntegTestUtil.cleanS3DirectoryBucket(s3FileIO.client(), bucketName, 
prefix);
+    } else {
+      AwsIntegTestUtil.cleanS3GeneralPurposeBucket(s3, bucketName, prefix);
+      AwsIntegTestUtil.deleteAccessPoint(s3Control, accessPointName);
+      AwsIntegTestUtil.deleteAccessPoint(crossRegionS3Control, 
crossRegionAccessPointName);
+      kms.scheduleKeyDeletion(
+          
ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build());
+    }
   }
 
   @BeforeEach
@@ -171,6 +178,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testNewInputStreamWithAccessPoint() throws Exception {
+    requireAccessPointSupport();
     s3.putObject(
         PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
         RequestBody.fromBytes(contentBytes));
@@ -201,12 +209,14 @@ public class TestS3FileIOIntegration {
       S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
       validateRead(s3FileIO, crossBucketObjectUri);
     } finally {
-      AwsIntegTestUtil.cleanS3Bucket(s3Client, crossRegionBucketName, 
crossBucketObjectKey);
+      AwsIntegTestUtil.cleanS3GeneralPurposeBucket(
+          s3Client, crossRegionBucketName, crossBucketObjectKey);
     }
   }
 
   @Test
   public void testNewInputStreamWithCrossRegionAccessPoint() throws Exception {
+    requireAccessPointSupport();
     
clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED,
 "true"));
     S3Client s3Client = clientFactory.s3();
     s3Client.putObject(
@@ -258,6 +268,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testNewOutputStreamWithAccessPoint() throws Exception {
+    requireAccessPointSupport();
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
     s3FileIO.initialize(
         ImmutableMap.of(
@@ -273,6 +284,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testNewOutputStreamWithCrossRegionAccessPoint() throws Exception 
{
+    requireAccessPointSupport();
     
clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED,
 "true"));
     S3Client s3Client = clientFactory.s3();
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3);
@@ -327,6 +339,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testServerSideKmsEncryption() throws Exception {
+    requireKMSEncryptionSupport();
     S3FileIOProperties properties = new S3FileIOProperties();
     properties.setSseType(S3FileIOProperties.SSE_TYPE_KMS);
     properties.setSseKey(kmsKeyArn);
@@ -342,6 +355,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testServerSideKmsEncryptionWithDefaultKey() throws Exception {
+    requireKMSEncryptionSupport();
     S3FileIOProperties properties = new S3FileIOProperties();
     properties.setSseType(S3FileIOProperties.SSE_TYPE_KMS);
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
@@ -363,6 +377,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testDualLayerServerSideKmsEncryption() throws Exception {
+    requireKMSEncryptionSupport();
     S3FileIOProperties properties = new S3FileIOProperties();
     properties.setSseType(S3FileIOProperties.DSSE_TYPE_KMS);
     properties.setSseKey(kmsKeyArn);
@@ -378,6 +393,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testServerSideCustomEncryption() throws Exception {
+    requireKMSEncryptionSupport();
     // generate key
     KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
     keyGenerator.init(256, new SecureRandom());
@@ -413,6 +429,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testACL() throws Exception {
+    requireACLSupport();
     S3FileIOProperties properties = new S3FileIOProperties();
     properties.setAcl(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL);
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, properties);
@@ -444,6 +461,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testDeleteFilesMultipleBatchesWithAccessPoints() throws 
Exception {
+    requireAccessPointSupport();
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, 
getDeletionTestProperties());
     s3FileIO.initialize(
         ImmutableMap.of(
@@ -454,6 +472,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testDeleteFilesMultipleBatchesWithCrossRegionAccessPoints() 
throws Exception {
+    requireKMSEncryptionSupport();
     
clientFactory.initialize(ImmutableMap.of(S3FileIOProperties.USE_ARN_REGION_ENABLED,
 "true"));
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, 
getDeletionTestProperties());
     s3FileIO.initialize(
@@ -515,6 +534,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testFileRecoveryHappyPath() throws Exception {
+    requireVersioningSupport();
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new 
S3FileIOProperties());
     String filePath = String.format("s3://%s/%s/%s", bucketName, prefix, 
"someFile.parquet");
     write(s3FileIO, filePath);
@@ -527,6 +547,7 @@ public class TestS3FileIOIntegration {
 
   @Test
   public void testFileRecoveryFailsToRecover() throws Exception {
+    requireVersioningSupport();
     S3FileIO s3FileIO = new S3FileIO(clientFactory::s3, new 
S3FileIOProperties());
     s3.putBucketVersioning(
         PutBucketVersioningRequest.builder()
@@ -613,4 +634,24 @@ public class TestS3FileIOIntegration {
                     builder -> builder.bucket(s3URI.bucket()).key(s3URI.key() 
+ i).build(),
                     RequestBody.empty()));
   }
+
+  /** S3 Express doesn't support access points */
+  private void requireAccessPointSupport() {
+    Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+  }
+
+  /** S3 Express doesn’t support KMS/custom encryption */
+  private void requireKMSEncryptionSupport() {
+    Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+  }
+
+  /** S3 Express doesn't support versioning */
+  private void requireVersioningSupport() {
+    Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+  }
+
+  /** File ACLs aren’t supported by S3 Express */
+  private void requireACLSupport() {
+    Assumptions.assumeThat(S3URI.isS3DirectoryBucket(bucketName)).isFalse();
+  }
 }
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
index 29d4c48927..901e9933b1 100644
--- 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
+++ 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java
@@ -59,7 +59,7 @@ public class TestS3MultipartUpload {
 
   @AfterAll
   public static void afterClass() {
-    AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
+    AwsIntegTestUtil.cleanS3GeneralPurposeBucket(s3, bucketName, prefix);
   }
 
   @BeforeEach
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 11a5ce0224..23b246c357 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -297,7 +297,13 @@ public class S3FileIO implements CredentialSupplier, 
DelegateFileIO, SupportsRec
 
   @Override
   public Iterable<FileInfo> listPrefix(String prefix) {
-    S3URI s3uri = new S3URI(prefix, 
s3FileIOProperties.bucketToAccessPointMapping());
+    S3URI uri = new S3URI(prefix, 
s3FileIOProperties.bucketToAccessPointMapping());
+    if (uri.useS3DirectoryBucket()
+        && s3FileIOProperties.isS3DirectoryBucketListPrefixAsDirectory()) {
+      uri = uri.toDirectoryPath();
+    }
+
+    S3URI s3uri = uri;
     ListObjectsV2Request request =
         
ListObjectsV2Request.builder().bucket(s3uri.bucket()).prefix(s3uri.key()).build();
 
diff --git 
a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
index 3a43880f31..5da758704a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java
@@ -428,6 +428,25 @@ public class S3FileIOProperties implements Serializable {
 
   public static final long S3_RETRY_MAX_WAIT_MS_DEFAULT = 20_000; // 20 seconds
 
+  /**
+   * Controls whether to list prefixes as directories for S3 Directory buckets 
Defaults value is
+   * true, where it will add the "/"
+   *
+   * <p>Example: s3://bucket/prefix will be shown as s3://bucket/prefix/
+   *
+   * <p>For more details see delimiter section in:
+   * 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax
+   *
+   * <p>If set to false, this will throw an error when the "/" is not provided 
for directory bucket.
+   * Turn off this feature if you are using S3FileIO.listPrefix for listing 
bucket prefixes that are
+   * not directories. This would ensure correctness and fail the operation 
based on S3 requirement
+   * when listing against a non-directory prefix in a directory bucket.
+   */
+  public static final String S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY =
+      "s3.directory-bucket.list-prefix-as-directory";
+
+  public static final boolean 
S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT = true;
+
   private String sseType;
   private String sseKey;
   private String sseMd5;
@@ -462,6 +481,8 @@ public class S3FileIOProperties implements Serializable {
   private int s3RetryNumRetries;
   private long s3RetryMinWaitMs;
   private long s3RetryMaxWaitMs;
+
+  private boolean s3DirectoryBucketListPrefixAsDirectory;
   private final Map<String, String> allProperties;
 
   public S3FileIOProperties() {
@@ -498,6 +519,8 @@ public class S3FileIOProperties implements Serializable {
     this.s3RetryNumRetries = S3_RETRY_NUM_RETRIES_DEFAULT;
     this.s3RetryMinWaitMs = S3_RETRY_MIN_WAIT_MS_DEFAULT;
     this.s3RetryMaxWaitMs = S3_RETRY_MAX_WAIT_MS_DEFAULT;
+    this.s3DirectoryBucketListPrefixAsDirectory =
+        S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT;
     this.allProperties = Maps.newHashMap();
 
     ValidationException.check(
@@ -605,6 +628,11 @@ public class S3FileIOProperties implements Serializable {
         PropertyUtil.propertyAsLong(properties, S3_RETRY_MIN_WAIT_MS, 
S3_RETRY_MIN_WAIT_MS_DEFAULT);
     this.s3RetryMaxWaitMs =
         PropertyUtil.propertyAsLong(properties, S3_RETRY_MAX_WAIT_MS, 
S3_RETRY_MAX_WAIT_MS_DEFAULT);
+    this.s3DirectoryBucketListPrefixAsDirectory =
+        PropertyUtil.propertyAsBoolean(
+            properties,
+            S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY,
+            S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY_DEFAULT);
 
     ValidationException.check(
         keyIdAccessKeyBothConfigured(),
@@ -837,6 +865,15 @@ public class S3FileIOProperties implements Serializable {
     return (long) s3RetryNumRetries() * s3RetryMaxWaitMs();
   }
 
+  public boolean isS3DirectoryBucketListPrefixAsDirectory() {
+    return s3DirectoryBucketListPrefixAsDirectory;
+  }
+
+  public void setS3DirectoryBucketListPrefixAsDirectory(
+      boolean s3DirectoryBucketListPrefixAsDirectory) {
+    this.s3DirectoryBucketListPrefixAsDirectory = 
s3DirectoryBucketListPrefixAsDirectory;
+  }
+
   private boolean keyIdAccessKeyBothConfigured() {
     return (accessKeyId == null) == (secretAccessKey == null);
   }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
index 79b4e695de..9cfba5fca3 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
@@ -37,6 +37,9 @@ class S3URI {
   private static final String QUERY_DELIM = "\\?";
   private static final String FRAGMENT_DELIM = "#";
 
+  /** Suffix of S3Express storage bucket names. */
+  private static final String S3_DIRECTORY_BUCKET_SUFFIX = "--x-s3";
+
   private final String location;
   private final String scheme;
   private final String bucket;
@@ -115,4 +118,34 @@ class S3URI {
   public String toString() {
     return location;
   }
+
+  /**
+   * Converts the current S3URI to a directory path.
+   *
+   * <p>This method ensures that the S3URI represents a directory by adding a 
"/" delimiter at the
+   * end of the prefix if it's not already present.
+   *
+   * @return a S3URI with the directory path configured
+   */
+  public S3URI toDirectoryPath() {
+    if (key.endsWith(PATH_DELIM)) {
+      return this;
+    }
+    return new S3URI(String.format("%s://%s/%s/", scheme, bucket, key));
+  }
+
+  public boolean useS3DirectoryBucket() {
+    return isS3DirectoryBucket(this.bucket);
+  }
+
+  /**
+   * Check if the bucket name indicates the bucket is a directory bucket. This 
method does not check
+   * against the S3 service.
+   *
+   * @param bucket bucket to probe.
+   * @return true if the bucket name indicates the bucket is a directory bucket
+   */
+  public static boolean isS3DirectoryBucket(final String bucket) {
+    return bucket.endsWith(S3_DIRECTORY_BUCKET_SUFFIX);
+  }
 }
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java 
b/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
index e2499e9476..58332d4258 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/TestS3FileIOProperties.java
@@ -312,4 +312,12 @@ public class TestS3FileIOProperties {
     s3Properties.applyS3AccessGrantsConfigurations(builder);
     assertThat(builder.plugins().size()).isEqualTo(0);
   }
+
+  @Test
+  public void testIsTreatS3DirectoryBucketListPrefixAsDirectoryEnabled() {
+    Map<String, String> map = Maps.newHashMap();
+    map.put(S3FileIOProperties.S3_DIRECTORY_BUCKET_LIST_PREFIX_AS_DIRECTORY, 
"false");
+    S3FileIOProperties properties = new S3FileIOProperties(map);
+    
assertThat(properties.isS3DirectoryBucketListPrefixAsDirectory()).isEqualTo(false);
+  }
 }
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index 6caa42fb41..77717d7961 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.aws.s3;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.AdditionalAnswers.delegatesTo;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
@@ -34,10 +36,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Spliterator;
+import java.util.Spliterators;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseTable;
@@ -58,6 +67,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileIOParser;
+import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.IOUtil;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
@@ -76,6 +86,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.Mockito;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
 import software.amazon.awssdk.regions.Region;
@@ -86,7 +97,11 @@ import 
software.amazon.awssdk.services.s3.model.CreateBucketRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
 import software.amazon.awssdk.services.s3.model.S3Error;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
 
 @ExtendWith(S3MockExtension.class)
 public class TestS3FileIO {
@@ -101,6 +116,9 @@ public class TestS3FileIO {
   private final int batchDeletionSize = 5;
   private S3FileIO s3FileIO;
 
+  private static final String S3_GENERAL_PURPOSE_BUCKET = "bucket";
+  private static final String S3_DIRECTORY_BUCKET = 
"directory-bucket-usw2-az1--x-s3";
+
   private final Map<String, String> properties =
       ImmutableMap.of(
           "s3.write.tags.tagKey1",
@@ -112,7 +130,7 @@ public class TestS3FileIO {
   public void before() {
     s3FileIO = new S3FileIO(() -> s3mock);
     s3FileIO.initialize(properties);
-    createBucket("bucket");
+    createBucket(S3_GENERAL_PURPOSE_BUCKET);
     for (int i = 1; i <= numBucketsForBatchDeletion; i++) {
       createBucket(batchDeletionBucketPrefix + i);
     }
@@ -257,6 +275,89 @@ public class TestS3FileIO {
     
assertThat(Streams.stream(s3FileIO.listPrefix(prefix)).count()).isEqualTo(totalFiles);
   }
 
+  /**
+   * Tests that we correctly insert the backslash for s3 express buckets. 
Currently the Adobe S3
+   * Mock doesn't cater for express buckets eg. When you call createBucket 
with s3 express
+   * configurations it still just returns a general bucket TODO Update to use 
S3Mock when it behaves
+   * as expected.
+   */
+  @Test
+  public void testPrefixListWithExpressAddSlash() {
+    assertPrefixIsAddedCorrectly("path/to/list", properties);
+
+    Map<String, String> newProperties =
+        ImmutableMap.of(
+            "s3.write.tags.tagKey1",
+            "TagValue1",
+            "s3.delete.batch-size",
+            Integer.toString(batchDeletionSize),
+            "s3.directory-bucket.list-prefix-as-directory",
+            "true");
+    assertPrefixIsAddedCorrectly("path/to/list/", newProperties);
+  }
+
+  public void assertPrefixIsAddedCorrectly(String suffix, Map<String, String> 
props) {
+    String prefix = String.format("s3://%s/%s", S3_DIRECTORY_BUCKET, suffix);
+
+    S3Client localMockedClient = mock(S3Client.class);
+
+    List<S3Object> s3Objects =
+        Arrays.asList(
+            S3Object.builder()
+                .key("path/to/list/file1.txt")
+                .size(1024L)
+                .lastModified(Instant.now())
+                .build(),
+            S3Object.builder()
+                .key("path/to/list/file2.txt")
+                .size(2048L)
+                .lastModified(Instant.now().minusSeconds(60))
+                .build());
+
+    ListObjectsV2Response response = 
ListObjectsV2Response.builder().contents(s3Objects).build();
+
+    ListObjectsV2Iterable mockedResponse = mock(ListObjectsV2Iterable.class);
+
+    Mockito.when(mockedResponse.stream()).thenReturn(Stream.of(response));
+
+    Mockito.when(
+            localMockedClient.listObjectsV2Paginator(
+                ListObjectsV2Request.builder()
+                    .prefix("path/to/list/")
+                    .bucket(S3_DIRECTORY_BUCKET)
+                    .build()))
+        .thenReturn(mockedResponse);
+
+    // Initialize S3FileIO with the mocked client
+    S3FileIO localS3FileIo = new S3FileIO(() -> localMockedClient);
+    localS3FileIo.initialize(props);
+
+    // Perform the listing
+    List<FileInfo> fileInfoList =
+        StreamSupport.stream(
+                Spliterators.spliteratorUnknownSize(
+                    localS3FileIo.listPrefix(prefix).iterator(), 
Spliterator.ORDERED),
+                false)
+            .collect(Collectors.toList());
+
+    // Assert that the returned FileInfo instances match the expected values
+    assertEquals(2, fileInfoList.size());
+    assertTrue(
+        fileInfoList.stream()
+            .anyMatch(
+                fi ->
+                    fi.location().endsWith("file1.txt")
+                        && fi.size() == 1024
+                        && fi.createdAtMillis() > 
Instant.now().minusSeconds(120).toEpochMilli()));
+    assertTrue(
+        fileInfoList.stream()
+            .anyMatch(
+                fi ->
+                    fi.location().endsWith("file2.txt")
+                        && fi.size() == 2048
+                        && fi.createdAtMillis() < 
Instant.now().minusSeconds(30).toEpochMilli()));
+  }
+
   /**
    * Ignoring because the test is flaky, failing with 500s from S3Mock. 
Coverage of prefix delete
    * exists through integration tests.
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
index 383ff67d16..d3f8ac35d4 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3URI.java
@@ -28,6 +28,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.Test;
 
 public class TestS3URI {
+  private static final String S3_DIRECTORY_BUCKET = 
"directory-bucket-usw2-az1--x-s3";
 
   @Test
   public void testLocationParsing() {
@@ -96,4 +97,46 @@ public class TestS3URI {
     assertThat(uri1.key()).isEqualTo("path/to/file");
     assertThat(uri1.toString()).isEqualTo(p1);
   }
+
+  @Test
+  public void testS3URIUseS3DirectoryBucket() {
+    assertThat(
+            new S3URI(String.format("s3://%s/path/to/file", 
S3_DIRECTORY_BUCKET))
+                .useS3DirectoryBucket())
+        .isTrue();
+    assertThat(new 
S3URI("s3://bucket/path/to/file").useS3DirectoryBucket()).isFalse();
+    assertThat(
+            new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket", 
S3_DIRECTORY_BUCKET))
+                .useS3DirectoryBucket())
+        .isTrue();
+    assertThat(
+            new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket", 
"bucket2"))
+                .useS3DirectoryBucket())
+        .isFalse();
+  }
+
+  @Test
+  public void testS3URIToDirectoryPath() {
+    assertThat(new 
S3URI("s3://bucket/path/to/file").toDirectoryPath().location())
+        .isEqualTo("s3://bucket/path/to/file/");
+    assertThat(new 
S3URI("s3://bucket/path/to/file/").toDirectoryPath().location())
+        .isEqualTo("s3://bucket/path/to/file/");
+    assertThat(new 
S3URI("s3a://bucket/path/to/file").toDirectoryPath().location())
+        .isEqualTo("s3a://bucket/path/to/file/");
+    assertThat(
+            new S3URI(String.format("s3://%s/path/to/file", 
S3_DIRECTORY_BUCKET))
+                .toDirectoryPath()
+                .location())
+        .isEqualTo(String.format("s3://%s/path/to/file/", 
S3_DIRECTORY_BUCKET));
+    assertThat(
+            new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket", 
S3_DIRECTORY_BUCKET))
+                .toDirectoryPath()
+                .location())
+        .isEqualTo(String.format("s3://%s/path/to/file/", 
S3_DIRECTORY_BUCKET));
+    assertThat(
+            new S3URI("s3://bucket/path/to/file", ImmutableMap.of("bucket", 
"bucket2"))
+                .toDirectoryPath()
+                .location())
+        .isEqualTo("s3://bucket2/path/to/file/");
+  }
 }

Reply via email to