TSFenwick commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1272641921


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +74,116 @@ public S3DataSegmentKiller(
     this.inputDataConfig = inputDataConfig;
   }
 
+  @Override
+  public void kill(List<DataSegment> segments) throws SegmentLoadingException
+  {
+    if (segments.isEmpty()) {
+      return;
+    }
+    if (segments.size() == 1) {
+      kill(segments.get(0));
+      return;
+    }
+
+    // create a map of bucket to keys to delete
+    Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete = 
new HashMap<>();
+    for (DataSegment segment : segments) {
+      String s3Bucket = MapUtils.getString(segment.getLoadSpec(), 
S3DataSegmentPuller.BUCKET);
+      String path = MapUtils.getString(segment.getLoadSpec(), 
S3DataSegmentPuller.KEY);
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+      keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+      keysToDelete.add(new 
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+      bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new 
ArrayList<>()).addAll(keysToDelete);
+    }
+
+    final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+    boolean shouldThrowException = false;
+    for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys 
: bucketToKeysToDelete.entrySet()) {
+      String s3Bucket = bucketToKeys.getKey();
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = 
bucketToKeys.getValue();
+      boolean hadException = deleteKeysForBucket(s3Client, s3Bucket, 
keysToDelete);
+      if (hadException) {
+        shouldThrowException = true;
+      }
+    }
+    if (shouldThrowException) {
+      // exception error message gets cutoff without providing any details. 
look at the logs for more details.
+      // this was a shortcut to handle the many different ways there could 
potentially be failures and handle them
+      // reasonably
+      throw new SegmentLoadingException(
+          "Couldn't delete segments from S3. See the task logs for more 
details."
+      );
+    }
+  }
+
+  /**
+   * Delete all keys in a bucket from s3
+   *
+   * @param s3Client     client used to communicate with s3
+   * @param s3Bucket     the bucket where the keys exist
+   * @param keysToDelete the keys to delete
+   * @return a boolean value of true if there was an issue deleting one or 
many keys, a boolean value of false if
+   * succesful
+   */
+  private boolean deleteKeysForBucket(
+      ServerSideEncryptingAmazonS3 s3Client,
+      String s3Bucket,
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete
+  )
+  {
+    boolean hadException = false;
+    DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(s3Bucket);
+    deleteObjectsRequest.setQuiet(true);
+    List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(
+        keysToDelete,
+        MAX_MULTI_OBJECT_DELETE_SIZE
+    );
+    for (List<DeleteObjectsRequest.KeyVersion> chunkOfKeys : keysChunks) {
+      List<String> keysToDeleteStrings = chunkOfKeys.stream().map(
+          
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+      try {
+        deleteObjectsRequest.setKeys(chunkOfKeys);
+        log.info(
+            "Removing from bucket: [%s] the following index files: [%s] from 
s3!",
+            s3Bucket,
+            keysToDeleteStrings
+        );
+        s3Client.deleteObjects(deleteObjectsRequest);
+      }
+      catch (MultiObjectDeleteException e) {
+        hadException = true;
+        Map<String, List<String>> errorToKeys = new HashMap<>();
+        for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
+          error.getMessage();
+          error.getKey();
+          errorToKeys.computeIfAbsent(error.getMessage(), k -> new 
ArrayList<>()).add(error.getKey());
+        }
+        errorToKeys.forEach((key, value) -> log.error(
+            "Unable to delete from bucket [%s], the following keys [%s], 
because [%s]",
+            s3Bucket,
+            String.join(", ", value),
+            key
+        ));
+      }
+      catch (AmazonServiceException e) {
+        hadException = true;
+        log.noStackTrace().warn(e,
+            "Unable to delete from bucket [%s], the following keys [%s]",
+            s3Bucket,
+            
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(",
 "))
+        );
+      }
+    }
+    return hadException;
+  }
+
   @Override
   public void kill(DataSegment segment) throws SegmentLoadingException

Review Comment:
   I left that alone since we can make that change at a later date when this 
has been more fully vetted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to