clintropolis commented on a change in pull request #9459: Ability to Delete
task logs and segments from S3
URL: https://github.com/apache/druid/pull/9459#discussion_r388682379
##########
File path:
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
##########
@@ -69,8 +85,48 @@ public void kill(DataSegment segment) throws
SegmentLoadingException
}
@Override
- public void killAll()
+ public void killAll() throws IOException
{
- throw new UnsupportedOperationException("not implemented");
+ try {
+ S3Utils.retryS3Operation(
+ () -> {
+ String bucketName = segmentPusherConfig.getBucket();
Review comment:
Hmm, this block is almost identical to the block in
`S3TaskLogs.killOlderThan`, and both partially very close to what
`ObjectSummaryIterator` is providing.
I think it would be nicer if this class and `S3TaskLogs` could use
`ObjectSummaryIterator`, and if the method that does the bulk key delete can be
put in a shared method in `S3Utils`.
Can I recommend something like this in `S3Utils`?
```java
public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
S3InputDataConfig config,
String bucket,
String prefix,
Predicate<S3ObjectSummary> filter
)
throws Exception
{
final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new
ArrayList<>(config.getMaxListingLength());
final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
s3Client,
ImmutableList.of(new CloudObjectLocation(bucket,
prefix).toUri("s3")),
config.getMaxListingLength()
);
while (iterator.hasNext()) {
final S3ObjectSummary nextObject = iterator.next();
if (filter.apply(nextObject)) {
keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
if (keysToDelete.size() == config.getMaxListingLength()) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
keysToDelete.clear();
}
}
}
if (keysToDelete.size() > 0) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
}
}
public static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
List<DeleteObjectsRequest.KeyVersion> keysToDelete
)
throws Exception
{
DeleteObjectsRequest deleteRequest = new
DeleteObjectsRequest(bucket).withKeys(keysToDelete);
S3Utils.retryS3Operation(() -> {
s3Client.deleteObjects(deleteRequest);
return null;
});
}
```
Then, not only does it share all the code for listing objects, it also
pushes down the retry to the specific API calls to list and delete, instead of
wrapping the entire loop, which I think is better.
If you make a change like this, then the kill calls become something like:
```java
@Override
public void killAll() throws IOException
{
try {
S3Utils.deleteObjectsInPath(
s3Client,
inputDataConfig,
segmentPusherConfig.getBucket(),
segmentPusherConfig.getBaseKey(),
Predicates.alwaysTrue()
);
}
catch (Exception e) {
log.error("Error occurred while deleting segment files from s3. Error:
%s", e.getMessage());
throw new IOException(e);
}
}
```
and
```java
@Override
public void killOlderThan(long timestamp) throws IOException
{
try {
S3Utils.deleteObjectsInPath(
service,
inputDataConfig,
config.getS3Bucket(),
config.getS3Prefix(),
(object) -> object.getLastModified().getTime() < timestamp
);
}
catch (Exception e) {
log.error("Error occurred while deleting task log files from s3.
Error: %s", e.getMessage());
throw new IOException(e);
}
}
```
`ObjectSummaryIterator` currently will skip things it thinks are
directories, if this is _not_ desirable for deleting segments and task logs,
then I would suggest maybe pushing down the predicate to the
`ObjectSummaryIterator` so that the existing users can filter out directories,
and your usages can get everything or filter by timestamp as appropriate. In
addition, if it shouldn't skip directories, it would probably be worth adding a
test for what happens when one is present in the list objects response, if not
present.
disclaimer: I didn't test this in real s3, but did run unit tests which
passed after some modifications to expected calls, but would still recommend to
review these snippets first to make sure they are correct.
What do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]