jackye1995 commented on a change in pull request #4052:
URL: https://github.com/apache/iceberg/pull/4052#discussion_r810398041



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -100,6 +114,66 @@ public void deleteFile(String path) {
     client().deleteObject(deleteRequest);
   }
 
+  /**
+   * Deletes the given paths in a batched manner.
+   * <p>
+   * The paths are grouped by bucket, and deletion is triggered when we either 
reach the configured batch size
+   * or have a final remainder batch for each bucket.
+   *
+   * @param paths paths to delete
+   */
+  @Override
+  public void deleteFiles(Iterable<String> paths) {
+    SetMultimap<String, String> bucketToObjects = 
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<String> failedDeletions = Lists.newArrayList();
+    for (String path : paths) {
+      S3URI location = new S3URI(path);
+      String bucket = location.bucket();
+      String objectKey = location.key();
+      Set<String> objectsInBucket = bucketToObjects.get(bucket);
+      if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
+        List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
+        failedDeletions.addAll(failedDeletionsForBatch);
+        bucketToObjects.removeAll(bucket);
+      }
+      bucketToObjects.get(bucket).add(objectKey);
+    }
+    // Delete the remainder
+    List<List<String>> remainderFailedObjects = bucketToObjects
+        .asMap()
+        .entrySet()
+        .stream()
+        .map(entry -> deleteObjectsInBucket(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+
+    remainderFailedObjects.forEach(failedDeletions::addAll);
+    if (!failedDeletions.isEmpty()) {
+      throw new S3BatchDeletionException(String.format("Failed to delete %d 
objects. Failed objects: %s",

Review comment:
       I think this will be not just for S3, we might want to add it to the api 
package as a generic `BatchDeleteFailedException`. We can also make constructor 
fit the input because this is a custom exception class. 
   
   Printing all the failed object paths might be a bit too much, if there are 
many file paths it's going to make log explode, I think a count is sufficient.

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -100,6 +114,66 @@ public void deleteFile(String path) {
     client().deleteObject(deleteRequest);
   }
 
+  /**
+   * Deletes the given paths in a batched manner.
+   * <p>
+   * The paths are grouped by bucket, and deletion is triggered when we either 
reach the configured batch size
+   * or have a final remainder batch for each bucket.
+   *
+   * @param paths paths to delete
+   */
+  @Override
+  public void deleteFiles(Iterable<String> paths) {
+    SetMultimap<String, String> bucketToObjects = 
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<String> failedDeletions = Lists.newArrayList();
+    for (String path : paths) {
+      S3URI location = new S3URI(path);
+      String bucket = location.bucket();
+      String objectKey = location.key();
+      Set<String> objectsInBucket = bucketToObjects.get(bucket);
+      if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
+        List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
+        failedDeletions.addAll(failedDeletionsForBatch);
+        bucketToObjects.removeAll(bucket);
+      }
+      bucketToObjects.get(bucket).add(objectKey);
+    }
+    // Delete the remainder
+    List<List<String>> remainderFailedObjects = bucketToObjects
+        .asMap()
+        .entrySet()
+        .stream()
+        .map(entry -> deleteObjectsInBucket(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+
+    remainderFailedObjects.forEach(failedDeletions::addAll);
+    if (!failedDeletions.isEmpty()) {
+      throw new S3BatchDeletionException(String.format("Failed to delete %d 
objects. Failed objects: %s",
+              failedDeletions.size(), failedDeletions));
+    }
+  }
+
+  private List<String> deleteObjectsInBucket(String bucket, Collection<String> 
objects) {
+    if (!objects.isEmpty()) {
+      List<ObjectIdentifier> objectIds = objects
+          .stream()
+          .map(objectKey -> ObjectIdentifier.builder().key(objectKey).build())
+          .collect(Collectors.toList());
+      DeleteObjectsRequest deleteObjectsRequest = 
DeleteObjectsRequest.builder()
+          .bucket(bucket)
+          .delete(Delete.builder().objects(objectIds).build())
+          .build();
+      DeleteObjectsResponse response = 
client().deleteObjects(deleteObjectsRequest);
+      if (response.hasErrors()) {
+        return response.errors()
+            .stream()
+            .map(S3Error::key)
+            .collect(Collectors.toList());
+      }
+    }

Review comment:
       nit: newline after `if` block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -100,6 +114,66 @@ public void deleteFile(String path) {
     client().deleteObject(deleteRequest);
   }
 
+  /**
+   * Deletes the given paths in a batched manner.
+   * <p>
+   * The paths are grouped by bucket, and deletion is triggered when we either 
reach the configured batch size
+   * or have a final remainder batch for each bucket.
+   *
+   * @param paths paths to delete
+   */
+  @Override
+  public void deleteFiles(Iterable<String> paths) {
+    SetMultimap<String, String> bucketToObjects = 
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<String> failedDeletions = Lists.newArrayList();
+    for (String path : paths) {
+      S3URI location = new S3URI(path);
+      String bucket = location.bucket();
+      String objectKey = location.key();
+      Set<String> objectsInBucket = bucketToObjects.get(bucket);
+      if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
+        List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
+        failedDeletions.addAll(failedDeletionsForBatch);
+        bucketToObjects.removeAll(bucket);
+      }
+      bucketToObjects.get(bucket).add(objectKey);
+    }

Review comment:
       nit: newline after `for` block

##########
File path: aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
##########
@@ -300,6 +312,11 @@ public AwsProperties(Map<String, String> properties) {
     this.isS3ChecksumEnabled = PropertyUtil.propertyAsBoolean(properties, 
S3_CHECKSUM_ENABLED,
         S3_CHECKSUM_ENABLED_DEFAULT);
 
+    this.s3FileIoDeleteBatchSize = PropertyUtil.propertyAsInt(properties, 
S3FILEIO_DELETE_BATCH_SIZE,
+        S3FILEIO_DELETE_BATCH_SIZE_DEFAULT);
+    Preconditions.checkArgument(s3FileIoDeleteBatchSize > 0 && 
s3FileIoDeleteBatchSize < 1000,

Review comment:
       1000 should be a static variable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to