TSFenwick commented on code in PR #14131:
URL: https://github.com/apache/druid/pull/14131#discussion_r1271017429
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java:
##########
@@ -76,63 +77,114 @@ public S3DataSegmentKiller(
@Override
public void kill(List<DataSegment> segments) throws SegmentLoadingException
{
- int size = segments.size();
- if (size == 0) {
+ if (segments.isEmpty()) {
return;
}
if (segments.size() == 1) {
kill(segments.get(0));
return;
}
- // we can assume that all segments are in the same bucket.
- String s3Bucket = MapUtils.getString(segments.get(0).getLoadSpec(),
BUCKET);
- final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+ // create a map of bucket to keys to delete
+ Map<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeysToDelete =
new HashMap<>();
+ for (DataSegment segment : segments) {
+ String s3Bucket = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.BUCKET);
+ String path = MapUtils.getString(segment.getLoadSpec(),
S3DataSegmentPuller.KEY);
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
+ keysToDelete.add(new DeleteObjectsRequest.KeyVersion(path));
+ keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path)));
+ bucketToKeysToDelete.computeIfAbsent(s3Bucket, k -> new
ArrayList<>()).addAll(keysToDelete);
+ }
- List<DeleteObjectsRequest.KeyVersion> keysToDelete = segments.stream()
- .map(segment -> MapUtils.getString(segment.getLoadSpec(), KEY))
- .flatMap(path -> Stream.of(new
DeleteObjectsRequest.KeyVersion(path),
- new
DeleteObjectsRequest.KeyVersion(DataSegmentKiller.descriptorPath(path))))
- .collect(Collectors.toList());
+ final ServerSideEncryptingAmazonS3 s3Client = this.s3ClientSupplier.get();
+ boolean shouldThrowException = false;
+ for (Map.Entry<String, List<DeleteObjectsRequest.KeyVersion>> bucketToKeys
: bucketToKeysToDelete.entrySet()) {
+ String s3Bucket = bucketToKeys.getKey();
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete =
bucketToKeys.getValue();
+ boolean hadException = deleteKeysForBucket(s3Client, s3Bucket,
keysToDelete);
+ if (hadException) {
+ shouldThrowException = true;
+ }
+ }
+ if (shouldThrowException) {
+ // exception error message gets cutoff without providing any details.
look at the logs for more details.
+ // this was a shortcut to handle the many different ways there could
potentially be failures and handle them
+ // reasonably
+ throw new SegmentLoadingException(
+ "Couldn't delete segments from s3 see logs for more details"
Review Comment:
@gianm The reason why i didn't do that is because all the information is
truncated in the status for the delete task. So i figured it wasn't worth the
complication to corral that state and present it in a meaningful way if it gets
truncated.
This is how it looks in reality in the status object.
```
"errorMsg": "org.apache.druid.segment.loading.SegmentLoadingException:
Couldn't delete from bucket: [tomfenwick-t..."
```
that being said i can still make those changes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]