clintropolis commented on a change in pull request #9459: Ability to Delete 
task logs and segments from S3
URL: https://github.com/apache/druid/pull/9459#discussion_r388682379
 
 

 ##########
 File path: 
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentKiller.java
 ##########
 @@ -69,8 +85,48 @@ public void kill(DataSegment segment) throws 
SegmentLoadingException
   }
 
   @Override
-  public void killAll()
+  public void killAll() throws IOException
   {
-    throw new UnsupportedOperationException("not implemented");
+    try {
+      S3Utils.retryS3Operation(
+          () -> {
+            String bucketName = segmentPusherConfig.getBucket();
 
 Review comment:
   Hmm, this block is almost identical to the block in 
`S3TaskLogs.killOlderThan`, and both partially very close to what 
`ObjectSummaryIterator` is providing.
   
   I think it would be nicer if this class and `S3TaskLogs` could use 
`ObjectSummaryIterator`, and if the method that does the bulk key delete can be 
put in a shared method in `S3Utils`.
   
   Can I recommend something like this in `S3Utils`?
   ```java
     public static void deleteObjectsInPath(
         ServerSideEncryptingAmazonS3 s3Client,
         S3InputDataConfig config,
         String bucket,
         String prefix,
         Predicate<S3ObjectSummary> filter
     )
         throws Exception
     {
       final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new 
ArrayList<>(config.getMaxListingLength());
       final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
           s3Client,
           ImmutableList.of(new CloudObjectLocation(bucket, 
prefix).toUri("s3")),
           config.getMaxListingLength()
       );
   
       while (iterator.hasNext()) {
         final S3ObjectSummary nextObject = iterator.next();
         if (filter.apply(nextObject)) {
           keysToDelete.add(new 
DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
           if (keysToDelete.size() == config.getMaxListingLength()) {
             deleteBucketKeys(s3Client, bucket, keysToDelete);
             keysToDelete.clear();
           }
         }
       }
   
       if (keysToDelete.size() > 0) {
         deleteBucketKeys(s3Client, bucket, keysToDelete);
       }
     }
   
     public static void deleteBucketKeys(
         ServerSideEncryptingAmazonS3 s3Client,
         String bucket,
         List<DeleteObjectsRequest.KeyVersion> keysToDelete
     )
         throws Exception
     {
       DeleteObjectsRequest deleteRequest = new 
DeleteObjectsRequest(bucket).withKeys(keysToDelete);
       S3Utils.retryS3Operation(() -> {
         s3Client.deleteObjects(deleteRequest);
         return null;
       });
     }
   ```
   Then, not only does it share all the code for listing objects, it also 
pushes down the retry to the specific API calls to list and delete, instead of 
wrapping the entire loop, which I think is better.
   
   If you make a change like this, then the kill calls become something like:
   ```java
     @Override
     public void killAll() throws IOException
     {
       try {
         S3Utils.deleteObjectsInPath(
             s3Client,
             inputDataConfig,
             segmentPusherConfig.getBucket(),
             segmentPusherConfig.getBaseKey(),
             Predicates.alwaysTrue()
         );
       }
       catch (Exception e) {
         log.error("Error occurred while deleting segment files from s3. Error: 
%s", e.getMessage());
         throw new IOException(e);
       }
     }
   ```
   and 
   ```java
     @Override
     public void killOlderThan(long timestamp) throws IOException
     {
       try {
         S3Utils.deleteObjectsInPath(
             service,
             inputDataConfig,
             config.getS3Bucket(),
             config.getS3Prefix(),
             (object) -> object.getLastModified().getTime() < timestamp
         );
       }
       catch (Exception e) {
         log.error("Error occurred while deleting task log files from s3. 
Error: %s", e.getMessage());
         throw new IOException(e);
       }
     }
   ```
   
   `ObjectSummaryIterator` currently will skip things it thinks are 
directories, if this is _not_ desirable for deleting segments and task logs, 
then I would suggest maybe pushing down the predicate to the 
`ObjectSummaryIterator` so that the existing users can filter out directories, 
and your usages can get everything or filter by timestamp as appropriate. In 
addition, if it shouldn't skip directories, it would probably be worth adding a 
test for what happens when one is present in the list objects response, if not 
present.
   
   disclaimer: I didn't test this in real s3, but did run unit tests which 
passed after some modifications to expected calls, but would still recommend to 
review these snippets first to make sure they are correct.
   
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to