jasonk000 commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1272591391
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +74,116 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ if (segments.isEmpty()) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // create a map of bucket to keys to delete
+ Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete =
new HashMap<>();
+ for (DataSegment segment : segments) {
+ String s3Bucket = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.BUCKET);
+ String path = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.KEY);
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+ keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+ keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+ bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new
ArrayList<>()).addAll(keysToDelete);
Review Comment:
We can avoid the extra temporary ArrayList like this:
```
List<DeleteObjectsRequest.KeyVersion> keysToDelete =
bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new ArrayList<>());
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
```
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +74,116 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ if (segments.isEmpty()) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // create a map of bucket to keys to delete
+ Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete =
new HashMap<>();
+ for (DataSegment segment : segments) {
+ String s3Bucket = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.BUCKET);
+ String path = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.KEY);
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+ keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+ keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+ bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new
ArrayList<>()).addAll(keysToDelete);
+ }
+
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+ boolean shouldThrowException = false;
+ for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys
: bucketToKeysToDelete.entrySet()) {
+ String s3Bucket = bucketToKeys.getKey();
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete =
bucketToKeys.getValue();
+ boolean hadException = deleteKeysForBucket(s3Client, s3Bucket,
keysToDelete);
+ if (hadException) {
+ shouldThrowException = true;
+ }
+ }
+ if (shouldThrowException) {
+ // exception error message gets cutoff without providing any details.
look at the logs for more details.
+ // this was a shortcut to handle the many different ways there could
potentially be failures and handle them
+ // reasonably
+ throw new SegmentLoadingException(
+ "Couldn't delete segments from S3. See the task logs for more
details."
+ );
+ }
+ }
+
+ /**
+ * Delete all keys in a bucket from s3
+ *
+ * @param s3Client client used to communicate with s3
+ * @param s3Bucket the bucket where the keys exist
+ * @param keysToDelete the keys to delete
+ * @return a boolean value of true if there was an issue deleting one or
many keys, a boolean value of false if
+ * succesful
+ */
+ private boolean deleteKeysForBucket(
+ ServerSideEncryptingAmazonS3 s3Client,
+ String s3Bucket,
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete
+ )
+ {
+ boolean hadException = false;
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ deleteObjectsRequest.setQuiet(true);
+ List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(
+ keysToDelete,
+ MAX_MULTI_OBJECT_DELETE_SIZE
+ );
+ for (List<DeleteObjectsRequest.KeyVersion> chunkOfKeys : keysChunks) {
+ List<String> keysToDeleteStrings = chunkOfKeys.stream().map(
+
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+ try {
+ deleteObjectsRequest.setKeys(chunkOfKeys);
+ log.info(
+ "Removing from bucket: [%s] the following index files: [%s] from
s3!",
+ s3Bucket,
+ keysToDeleteStrings
+ );
+ s3Client.deleteObjects(deleteObjectsRequest);
+ }
+ catch (MultiObjectDeleteException e) {
+ hadException = true;
+ Map<String, List<String>> errorToKeys = new HashMap<>();
+ for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
+ error.getMessage();
+ error.getKey();
+ errorToKeys.computeIfAbsent(error.getMessage(), k -> new
ArrayList<>()).add(error.getKey());
+ }
+ errorToKeys.forEach((key, value) -> log.error(
+ "Unable to delete from bucket [%s], the following keys [%s],
because [%s]",
+ s3Bucket,
+ String.join(", ", value),
+ key
+ ));
+ }
+ catch (AmazonServiceException e) {
+ hadException = true;
+ log.noStackTrace().warn(e,
+ "Unable to delete from bucket [%s], the following keys [%s]",
+ s3Bucket,
+
chunkOfKeys.stream().map(DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.joining(",
"))
+ );
+ }
+ }
+ return hadException;
+ }
+
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
Review Comment:
Is there any reason to keep this specifically?, I much prefer your bulk
implementation.
We could remove it completely:
```
public void kill(DataSegment segment) throws SegmentLoadingException
{
kill(Collections.singletonList(segment));
}
```
Alternatively, we can avoid the call to `doesObjectExist` and issue the
delete directly, since same outcome is achieved.
##########
processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java:
##########
@@ -54,6 +55,29 @@ static String descriptorPath(String path)
*/
void kill(DataSegment segment) throws SegmentLoadingException;
+ /**
+ * Kills a list of segments from deep storage. The default implementation
calls kill on the segments in a loop.
+ * Implementers of this interface can leverage batch / bulk deletes to be
more efficient. It is preferable to attempt
+ * to delete all segments even if there is an issue with deleting a single
one. This is up to implementers to
+ * implement as putting a try catch around the default kill via iteration
can be problematic if the client of the deep
+ * storage is unable to authenticate itself and segment loading exception
doesn't encode enough information in it to \
+ * understand why it failed.
+ * <p>
+ * If a segment or segments does not exist in deep storage, the method
should not throw an exception.
+ * <p>
+ * This version of kill must **NOT** require additional permissions on the
deep storage beyond what
+ * {@link #kill(DataSegment)} requires.
+ * @param segments The list of segments to kill.
+ * @throws SegmentLoadingException If there is an exception during deletion
such as a segment in the list could not be
+ * completely removed.
+ */
+ default void kill(List<DataSegment> segments) throws SegmentLoadingException
Review Comment:
for example
[here](https://github.com/apache/druid/blob/28914bbab82480f33a345913a30c32321d36fc37/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java#L505)
##########
processing/src/main/java/org/apache/druid/segment/loading/DataSegmentKiller.java:
##########
@@ -54,6 +55,29 @@ static String descriptorPath(String path)
*/
void kill(DataSegment segment) throws SegmentLoadingException;
+ /**
+ * Kills a list of segments from deep storage. The default implementation
calls kill on the segments in a loop.
+ * Implementers of this interface can leverage batch / bulk deletes to be
more efficient. It is preferable to attempt
+ * to delete all segments even if there is an issue with deleting a single
one. This is up to implementers to
+ * implement as putting a try catch around the default kill via iteration
can be problematic if the client of the deep
+ * storage is unable to authenticate itself and segment loading exception
doesn't encode enough information in it to \
+ * understand why it failed.
+ * <p>
+ * If a segment or segments does not exist in deep storage, the method
should not throw an exception.
+ * <p>
+ * This version of kill must **NOT** require additional permissions on the
deep storage beyond what
+ * {@link #kill(DataSegment)} requires.
+ * @param segments The list of segments to kill.
+ * @throws SegmentLoadingException If there is an exception during deletion
such as a segment in the list could not be
+ * completely removed.
+ */
+ default void kill(List<DataSegment> segments) throws SegmentLoadingException
Review Comment:
We could consider extending this to `killQuietly`, as that is called from
the appenderator layer for multiple segments at a time.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -64,13 +74,116 @@ public S3DataSegmentKiller(
this.inputDataConfig = inputDataConfig;
}
+ @Override
+ public void kill(List<DataSegment> segments) throws SegmentLoadingException
+ {
+ if (segments.isEmpty()) {
+ return;
+ }
+ if (segments.size() == 1) {
+ kill(segments.get(0));
+ return;
+ }
+
+ // create a map of bucket to keys to delete
+ Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete =
new HashMap<>();
+ for (DataSegment segment : segments) {
+ String s3Bucket = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.BUCKET);
+ String path = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.KEY);
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+ keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+ keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+ bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new
ArrayList<>()).addAll(keysToDelete);
+ }
+
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+ boolean shouldThrowException = false;
+ for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys
: bucketToKeysToDelete.entrySet()) {
+ String s3Bucket = bucketToKeys.getKey();
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete =
bucketToKeys.getValue();
+ boolean hadException = deleteKeysForBucket(s3Client, s3Bucket,
keysToDelete);
+ if (hadException) {
+ shouldThrowException = true;
+ }
+ }
+ if (shouldThrowException) {
+ // exception error message gets cutoff without providing any details.
look at the logs for more details.
+ // this was a shortcut to handle the many different ways there could
potentially be failures and handle them
+ // reasonably
+ throw new SegmentLoadingException(
+ "Couldn't delete segments from S3. See the task logs for more
details."
+ );
+ }
+ }
+
+ /**
+ * Delete all keys in a bucket from s3
+ *
+ * @param s3Client client used to communicate with s3
+ * @param s3Bucket the bucket where the keys exist
+ * @param keysToDelete the keys to delete
+ * @return a boolean value of true if there was an issue deleting one or
many keys, a boolean value of false if
+ * succesful
+ */
+ private boolean deleteKeysForBucket(
+ ServerSideEncryptingAmazonS3 s3Client,
+ String s3Bucket,
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete
+ )
+ {
+ boolean hadException = false;
+ DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(s3Bucket);
+ deleteObjectsRequest.setQuiet(true);
+ List<List<DeleteObjectsRequest.KeyVersion>> keysChunks = Lists.partition(
+ keysToDelete,
+ MAX_MULTI_OBJECT_DELETE_SIZE
+ );
+ for (List<DeleteObjectsRequest.KeyVersion> chunkOfKeys : keysChunks) {
+ List<String> keysToDeleteStrings = chunkOfKeys.stream().map(
+
DeleteObjectsRequest.KeyVersion::getKey).collect(Collectors.toList());
+ try {
+ deleteObjectsRequest.setKeys(chunkOfKeys);
+ log.info(
+ "Removing from bucket: [%s] the following index files: [%s] from
s3!",
+ s3Bucket,
+ keysToDeleteStrings
+ );
+ s3Client.deleteObjects(deleteObjectsRequest);
+ }
+ catch (MultiObjectDeleteException e) {
+ hadException = true;
+ Map<String, List<String>> errorToKeys = new HashMap<>();
+ for (MultiObjectDeleteException.DeleteError error : e.getErrors()) {
+ error.getMessage();
+ error.getKey();
+ errorToKeys.computeIfAbsent(error.getMessage(), k -> new
ArrayList<>()).add(error.getKey());
+ }
+ errorToKeys.forEach((key, value) -> log.error(
+ "Unable to delete from bucket [%s], the following keys [%s],
because [%s]",
+ s3Bucket,
+ String.join(", ", value),
+ key
+ ));
+ }
+ catch (AmazonServiceException e) {
+ hadException = true;
+ log.noStackTrace().warn(e,
Review Comment:
IMO this should be error, not warn.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]