amogh-jahagirdar commented on a change in pull request #4052:
URL: https://github.com/apache/iceberg/pull/4052#discussion_r814044074



##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -100,6 +115,67 @@ public void deleteFile(String path) {
     client().deleteObject(deleteRequest);
   }
 
+  /**
+   * Deletes the given paths in a batched manner.
+   * <p>
+   * The paths are grouped by bucket, and deletion is triggered when we either 
reach the configured batch size
+   * or have a final remainder batch for each bucket.
+   *
+   * @param paths paths to delete
+   */
+  @Override
+  public void deleteFiles(Iterable<String> paths) {
+    SetMultimap<String, String> bucketToObjects = 
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<String> failedDeletions = Lists.newArrayList();
+    for (String path : paths) {
+      S3URI location = new S3URI(path);
+      String bucket = location.bucket();
+      String objectKey = location.key();
+      Set<String> objectsInBucket = bucketToObjects.get(bucket);
+      if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
+        List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
+        failedDeletions.addAll(failedDeletionsForBatch);
+        bucketToObjects.removeAll(bucket);
+      }
+      bucketToObjects.get(bucket).add(objectKey);
+    }
+
+    // Delete the remainder
+    List<List<String>> remainderFailedObjects = bucketToObjects
+        .asMap()
+        .entrySet()
+        .stream()
+        .map(entry -> deleteObjectsInBucket(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+
+    remainderFailedObjects.forEach(failedDeletions::addAll);
+    if (!failedDeletions.isEmpty()) {
+      throw new FileIODeletionException(failedDeletions);
+    }
+
+  }
+
+  private List<String> deleteObjectsInBucket(String bucket, Collection<String> 
objects) {
+    if (!objects.isEmpty()) {
+      List<ObjectIdentifier> objectIds = objects
+          .stream()
+          .map(objectKey -> ObjectIdentifier.builder().key(objectKey).build())
+          .collect(Collectors.toList());
+      DeleteObjectsRequest deleteObjectsRequest = 
DeleteObjectsRequest.builder()
+          .bucket(bucket)
+          .delete(Delete.builder().objects(objectIds).build())
+          .build();
+      DeleteObjectsResponse response = 
client().deleteObjects(deleteObjectsRequest);
+      if (response.hasErrors()) {
+        return response.errors()
+            .stream()
+            .map(S3Error::key)

Review comment:
       Ah yeah that's true. One thing to note though is the exception that gets 
surfaced to users in case of bulk deletion failure just has the number of 
failures rather than the full list of paths, as it can be quite noisy in user's 
logs.
   
   I do still think we should construct the full S3 paths in case someone wants 
to use that in the future, but again it's not used when we surface the failure 
to the users.
   
   Do you think it would be useful if S3 BatchDeletion exception message listed 
out the first 10 (or some other small number) of paths which we failed to 
delete?

##########
File path: aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
##########
@@ -100,6 +115,67 @@ public void deleteFile(String path) {
     client().deleteObject(deleteRequest);
   }
 
+  /**
+   * Deletes the given paths in a batched manner.
+   * <p>
+   * The paths are grouped by bucket, and deletion is triggered when we either 
reach the configured batch size
+   * or have a final remainder batch for each bucket.
+   *
+   * @param paths paths to delete
+   */
+  @Override
+  public void deleteFiles(Iterable<String> paths) {
+    SetMultimap<String, String> bucketToObjects = 
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet);
+    List<String> failedDeletions = Lists.newArrayList();
+    for (String path : paths) {
+      S3URI location = new S3URI(path);
+      String bucket = location.bucket();
+      String objectKey = location.key();
+      Set<String> objectsInBucket = bucketToObjects.get(bucket);
+      if (objectsInBucket.size() == awsProperties.s3FileIoDeleteBatchSize()) {
+        List<String> failedDeletionsForBatch = deleteObjectsInBucket(bucket, 
objectsInBucket);
+        failedDeletions.addAll(failedDeletionsForBatch);
+        bucketToObjects.removeAll(bucket);
+      }
+      bucketToObjects.get(bucket).add(objectKey);
+    }
+
+    // Delete the remainder
+    List<List<String>> remainderFailedObjects = bucketToObjects
+        .asMap()
+        .entrySet()
+        .stream()
+        .map(entry -> deleteObjectsInBucket(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+
+    remainderFailedObjects.forEach(failedDeletions::addAll);
+    if (!failedDeletions.isEmpty()) {
+      throw new FileIODeletionException(failedDeletions);
+    }
+
+  }
+
+  private List<String> deleteObjectsInBucket(String bucket, Collection<String> 
objects) {
+    if (!objects.isEmpty()) {
+      List<ObjectIdentifier> objectIds = objects
+          .stream()
+          .map(objectKey -> ObjectIdentifier.builder().key(objectKey).build())
+          .collect(Collectors.toList());
+      DeleteObjectsRequest deleteObjectsRequest = 
DeleteObjectsRequest.builder()
+          .bucket(bucket)
+          .delete(Delete.builder().objects(objectIds).build())
+          .build();
+      DeleteObjectsResponse response = 
client().deleteObjects(deleteObjectsRequest);
+      if (response.hasErrors()) {
+        return response.errors()
+            .stream()
+            .map(S3Error::key)
+            .collect(Collectors.toList());
+      }
+    }
+    return Lists.newArrayList();

Review comment:
       Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to