gianm commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1263028335


##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentKillerTest.java:
##########
@@ -347,7 +347,7 @@ public void 
test_kill_listOfSegments_multiDeleteExceptionIsThrown()
       segmentKiller.kill(ImmutableList.of(DATA_SEGMENT_1, DATA_SEGMENT_2));
     }
     catch (SegmentLoadingException exc) {
-      Assert.assertEquals("Couldn't delete from bucket: [test_bucket] these 
files [[key1/]]", exc.getMessage());
+      Assert.assertEquals("Couldn't delete segments from s3 see logs for more 
details", exc.getMessage());

Review Comment:
   You should use `Assert.assertThrows` rather than doing this. This approach 
has the problem that if `segmentKiller.kill` _doesn't_ throw an exception for 
some reason, then the test will still pass, because this `Assert.assertEquals` 
never gets called. But, we want it to fail if the expected exception isn't 
thrown.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -76,63 +77,114 @@ public S3DataSegmentKiller(
   @Override
   public void kill(List<DataSegment> segments) throws SegmentLoadingException
   {
-    int size = segments.size();
-    if (size == 0) {
+    if (segments.isEmpty()) {
       return;
     }
     if (segments.size() == 1) {
       kill(segments.get(0));
       return;
     }
 
-    // we can assume that all segments are in the same bucket.
-    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
BUCKET);
-    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+    // create a map of bucket to keys to delete
+    Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete = 
new HashMap<>();
+    for (DataSegment segment : segments) {
+      String s3Bucket = MapUtils.getString(segment.getLoadSpec(), 
S3DataSegmentPuller.BUCKET);
+      String path = MapUtils.getString(segment.getLoadSpec(), 
S3DataSegmentPuller.KEY);
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+      keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+      keysToDelete.add(new 
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+      bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new 
ArrayList<>()).addAll(keysToDelete);
+    }
 
-    List<DeleteObjectsRequest.KeyVersion> keysToDelete = segments.stream()
-            .map(segment -> MapUtils.getString(segment.getLoadSpec(), KEY))
-            .flatMap(path -> Stream.of(new 
DeleteObjectsRequest.KeyVersion(path),
-                                     new 
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))))
-            .collect(Collectors.toList());
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+    boolean shouldThrowException = false;
+    for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys 
: bucketToKeysToDelete.entrySet()) {
+      String s3Bucket = bucketToKeys.getKey();
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = 
bucketToKeys.getValue();
+      boolean hadException = deleteKeysForBucket(s3Client, s3Bucket, 
keysToDelete);
+      if (hadException) {
+        shouldThrowException = true;
+      }
+    }
+    if (shouldThrowException) {
+      // exception error message gets cutoff without providing any details. 
look at the logs for more details.
+      // this was a shortcut to handle the many different ways there could 
potentially be failures and handle them
+      // reasonably
+      throw new SegmentLoadingException(
+          "Couldn't delete segments from s3 see logs for more details"
+      );
+    }
+  }
 
-    // max delete object request size is 1000 for S3
-    List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = 
Lists.partition(keysToDelete, 1000);
+  /**
+   * Delete all keys in a bucket from s3
+   *
+   * @param s3Client     client used to communicate with s3
+   * @param s3Bucket     the bucket where the keys exist
+   * @param keysToDelete the keys to delete
+   * @return a boolean value of true if there was an issue deleting one or 
many keys, a boolean value of false if
+   * succesful
+   */
+  private boolean deleteKeysForBucket(
+      ServerSideEncryptingAmazonS3 s3Client,
+      String s3Bucket,
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete
+  )
+  {
+    boolean hadException = false;
     DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
-    // only return objects failed to delete.
     deleteObjectsRequest.setQuiet(true);
-
-    List<String> keysNotDeleted = new ArrayList<>();
-    for (List<DeleteObjectsRequest.KeyVersion> keysChunk : keysChunks) {
-      List<String> keysToDeleteStrings = keysChunk.stream().map(
-            
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+    List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(
+        keysToDelete,
+        MAX_MULTI_OBJECT_DELETE_SIZE
+    );
+    for (List<DeleteObjectsRequest.KeyVersion> chunkOfKeys : keysChunks) {
+      List<String> keysToDeleteStrings = chunkOfKeys.stream().map(
+          
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
       try {
-        deleteObjectsRequest.setKeys(keysChunk);
-        log.info("Removing from bucket: [%s] the following index files: [%s] 
from s3!", s3Bucket, keysToDeleteStrings);
+        deleteObjectsRequest.setKeys(chunkOfKeys);
+        log.info(
+            "Removing from bucket: [%s] the following index files: [%s] from 
s3!",
+            s3Bucket,
+            keysToDeleteStrings
+        );
         s3Client.deleteObjects(deleteObjectsRequest);
       }
       catch (MultiObjectDeleteException e) {
-        keysNotDeleted.addAll(e.getErrors().stream()
-                               
.map(MultiObjectDeleteException.DeleteError::getKey)
-                               .collect(Collectors.toList()));
+        hadException = true;
+        Map<String, List<String>> errorToKeys = new HashMap<>();
+        for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
+          error.getMessage();
+          error.getKey();
+          errorToKeys.computeIfAbsent(error.getMessage(), k -> new 
ArrayList<>()).add(error.getKey());
+        }
+        errorToKeys.forEach((key, value) -> log.error(
+            "Unable to delete from bucket [%s], the following keys [%s], 
because [%s]",
+            s3Bucket,
+            String.join(", ", value),
+            key
+        ));
       }
       catch (AmazonServiceException e) {
-        throw new SegmentLoadingException(e,
-                                          "Unable to delete from bucket [%s]",
-                                          s3Bucket);
+        hadException = true;
+        log.error(

Review Comment:
   When logging it's best to put `e` in the first position rather than 
interpolate `e.getMessage()`. If your goal is to avoid showing a stack trace, 
you can do `log.noStackTrace().error(...)`. This is nice because if people 
enable debug-level logging, the stack traces are shown anyway, providing admins 
with a way to get the stack traces if needed.
   
   Also, a `warn` would be better here. Generally `log.error` should be 
reserved for stuff that is really serious and affects the entire service.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -76,63 +77,114 @@ public S3DataSegmentKiller(
   @Override
   public void kill(List<DataSegment> segments) throws SegmentLoadingException
   {
-    int size = segments.size();
-    if (size == 0) {
+    if (segments.isEmpty()) {
       return;
     }
     if (segments.size() == 1) {
       kill(segments.get(0));
       return;
     }
 
-    // we can assume that all segments are in the same bucket.
-    String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(), 
BUCKET);
-    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+    // create a map of bucket to keys to delete
+    Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete = 
new HashMap<>();
+    for (DataSegment segment : segments) {
+      String s3Bucket = MapUtils.getString(segment.getLoadSpec(), 
S3DataSegmentPuller.BUCKET);
+      String path = MapUtils.getString(segment.getLoadSpec(), 
S3DataSegmentPuller.KEY);
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+      keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+      keysToDelete.add(new 
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+      bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new 
ArrayList<>()).addAll(keysToDelete);
+    }
 
-    List<DeleteObjectsRequest.KeyVersion> keysToDelete = segments.stream()
-            .map(segment -> MapUtils.getString(segment.getLoadSpec(), KEY))
-            .flatMap(path -> Stream.of(new 
DeleteObjectsRequest.KeyVersion(path),
-                                     new 
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))))
-            .collect(Collectors.toList());
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+    boolean shouldThrowException = false;
+    for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys 
: bucketToKeysToDelete.entrySet()) {
+      String s3Bucket = bucketToKeys.getKey();
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = 
bucketToKeys.getValue();
+      boolean hadException = deleteKeysForBucket(s3Client, s3Bucket, 
keysToDelete);
+      if (hadException) {
+        shouldThrowException = true;
+      }
+    }
+    if (shouldThrowException) {
+      // exception error message gets cutoff without providing any details. 
look at the logs for more details.
+      // this was a shortcut to handle the many different ways there could 
potentially be failures and handle them
+      // reasonably
+      throw new SegmentLoadingException(
+          "Couldn't delete segments from s3 see logs for more details"

Review Comment:
   Please improve the grammar (missing punctuation) and capitalization (S3).
   
   Also, can we interpolate some useful information into this error message? 
Such as the number of segments, the first segment ID (or first N IDs) that 
couldn't be deleted, the nature of the reason why it couldn't be deleted, etc. 
This info could come back from `deleteKeysForBucket` rather than a `boolean`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to