LakshSingla commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1144631455


##########
processing/src/main/java/org/apache/druid/storage/StorageConnector.java:
##########
@@ -105,26 +105,39 @@
    * with a basePath.
    * If the path is a directory, this method throws an exception.
    *
-   * @param path
-   * @throws IOException
+   * @param path to delete
+   * @throws IOException thrown in case of errors.
    */
   void deleteFile(String path) throws IOException;
 
+
+  /**
+   * Delete files present at the input paths. Most implementations prepend all 
the input paths
+   * with the basePath.
+   * <br/>
+   * This method is <b>recommended</b> in case we need to delete a batch of 
files.
+   * If the path is a directory, this method throws an exception.
+   *
+   * @param paths Iterable of the paths to delete.
+   * @throws IOException thrown in case of errors.
+   */
+  void deleteFiles(Iterable<String> paths) throws IOException;
+
   /**
    * Delete a directory pointed to by the path and also recursively deletes 
all files/directories in said directory.
    * Most implementations prepend the input path with a basePath.
    *
    * @param path path
-   * @throws IOException
+   * @throws IOException thrown in case of errors.
    */
   void deleteRecursively(String path) throws IOException;
 
   /**
-   * Returns a list containing all the files present in the path. The returned 
filenames should be such that joining
+   * Returns a lazy iterator containing all the files present in the path. The 
returned filenames should be such that joining
    * the dirName and the file name form the full path that can be used as the 
arguments for other methods of the storage
    * connector.
-   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the 
filename returned for the path
-   * "parent1/parent2" should be "child" and for "parent1" should be "parent2"
+   * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the 
filename returned for the input path
+   * "parent1/parent2" should be "child" and for input "parent1" should be 
"parent2/child"
    */
-  List<String> listDir(String dirName);
+  Iterator<String> listDir(String dirName) throws IOException;

Review Comment:
   Instead of Iterator, I think it would be better to get an Iterable and let 
the caller fetch the Iterator from the same. The caller might decide wanna 
iterate over the returned value again without making the list call. 



##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -121,4 +120,14 @@ public static String getOutputsFileNameForPath(
         path
     );
   }
+
+  /**
+   * Tries to parse out the controller taskID from the input path.
+   * <br></br>
+   * For eg:  for input path <b>controller_query_id/task/123</b> <br/>the 
function will return <b>controller_query_id</b>
+   */
+  public static String getControllerTaskIdWithPrefixFromPath(String path)
+  {
+    return path.split("/", 1)[0];

Review Comment:
   There should be some checks with an error message shown: `Invalid path 
provided. Cannot extract the controller id from the path [%s] `



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##########
@@ -243,59 +243,73 @@ public static S3ObjectSummary 
getSingleObjectSummary(ServerSideEncryptingAmazonS
    * Delete the files from S3 in a specified bucket, matching a specified 
prefix and filter
    *
    * @param s3Client s3 client
-   * @param config   specifies the configuration to use when finding matching 
files in S3 to delete
+   * @param maxListingLength  maximum number of keys to fetch and delete at a 
time
    * @param bucket   s3 bucket
    * @param prefix   the file prefix
    * @param filter   function which returns true if the prefix file found 
should be deleted and false otherwise.
    *
-   * @throws Exception
+   * @throws Exception in case of errors
    */
+
   public static void deleteObjectsInPath(
       ServerSideEncryptingAmazonS3 s3Client,
-      S3InputDataConfig config,
+      int maxListingLength,
       String bucket,
       String prefix,
       Predicate<S3ObjectSummary> filter
   )
       throws Exception
   {
-    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new 
ArrayList<>(config.getMaxListingLength());
+    deleteObjectsInPath(s3Client, maxListingLength, bucket, prefix, filter, 
RetryUtils.DEFAULT_MAX_TRIES);
+  }
+
+  public static void deleteObjectsInPath(
+      ServerSideEncryptingAmazonS3 s3Client,
+      int maxListingLength,
+      String bucket,
+      String prefix,
+      Predicate<S3ObjectSummary> filter,
+      int maxRetries
+  )
+      throws Exception
+  {
+    final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new 
ArrayList<>(maxListingLength);
     final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
         s3Client,
         ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
-        config.getMaxListingLength()
+        maxListingLength
     );
 
     while (iterator.hasNext()) {
       final S3ObjectSummary nextObject = iterator.next();
       if (filter.apply(nextObject)) {
         keysToDelete.add(new 
DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
-        if (keysToDelete.size() == config.getMaxListingLength()) {
-          deleteBucketKeys(s3Client, bucket, keysToDelete);
-          log.info("Deleted %d files", keysToDelete.size());
+        if (keysToDelete.size() == maxListingLength) {
+          deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);

Review Comment:
   Here, for each batch of maxListingLength, we are retrying the operation if 
it fails `maxRetries` times. This amounts to a total of `maxListingLength * 
maxRetries` retries.
   I think it would be better if the `deleteBucketKeys` doesn't retry on it's 
own and we have a top level retry mechanism, something like:
   ```
   RetryUtils.retry(
      // Body of the function
      // deleteBucketKeys(s3Client, bucket, keysToDelete, 1);  // Donot retry 
the deleteBucketKeys() individually
      // Remaining retriable body of the function
   )
   ```
   WDYT?



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########


Review Comment:
   For the functions that have retries added, we should update the Javadocs to 
mention that they are retriable `config.getMaxRetry()` amount of times. Or we 
should update the class level Javadoc that mentions that all the functions are 
retriable. 



##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java:
##########
@@ -214,21 +237,33 @@ public void pathDeleteRecursively() throws IOException
   }
 
   @Test
-  public void testListDir()
+  public void testListDir() throws IOException
   {
     EasyMock.reset(S3_CLIENT, TEST_RESULT);
 
     S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
     s3ObjectSummary.setBucketName(BUCKET);
     s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
+    s3ObjectSummary.setSize(1);
 
     
EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
     EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false);
+    EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null);
     EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request) 
EasyMock.anyObject()))
             .andReturn(TEST_RESULT);
     EasyMock.replay(S3_CLIENT, TEST_RESULT);
 
-    List<String> listDirResult = storageConnector.listDir("/");
+    List<String> listDirResult = 
Lists.newArrayList(storageConnector.listDir("test/"));
     Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult);
   }
+
+  private String convetDelReqToString(DeleteObjectsRequest 
deleteObjectsRequest)

Review Comment:
   ```suggestion
     private String convetDeleteObjectsRequestToString(DeleteObjectsRequest 
deleteObjectsRequest)
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
               return;
             }
             TaskRunner taskRunner = taskRunnerOptional.get();
-            Set<String> allDirectories = new 
HashSet<>(storageConnector.listDir("/"));
+            Iterator<String> allFiles = storageConnector.listDir("");
             Set<String> runningTaskIds = taskRunner.getRunningTasks()
                                                    .stream()
                                                    
.map(TaskRunnerWorkItem::getTaskId)
                                                    
.map(DurableStorageUtils::getControllerDirectory)
                                                    
.collect(Collectors.toSet());
-            Set<String> unknownDirectories = Sets.difference(allDirectories, 
runningTaskIds);
-            LOG.info(
-                "Following directories do not have a corresponding MSQ task 
associated with it:\n%s\nThese will get cleaned up.",
-                unknownDirectories
-            );
-            for (String unknownDirectory : unknownDirectories) {
-              storageConnector.deleteRecursively(unknownDirectory);
+
+            Set<String> filesToRemove = new HashSet<>();
+            while (allFiles.hasNext()) {
+              String currentFile = allFiles.next();
+              String taskIdFromPathOrEmpty = 
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+              if (taskIdFromPathOrEmpty != null && 
!taskIdFromPathOrEmpty.isEmpty()) {
+                if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {

Review Comment:
   nit: Can collapse this to something like
   ```suggestion
                   if (!runningTaskIds.contains(taskIdFromPathOrEmpty)) {
                        // filesToRemove.add(currentFile);
                   }
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
               return;
             }
             TaskRunner taskRunner = taskRunnerOptional.get();
-            Set<String> allDirectories = new 
HashSet<>(storageConnector.listDir("/"));
+            Iterator<String> allFiles = storageConnector.listDir("");
             Set<String> runningTaskIds = taskRunner.getRunningTasks()
                                                    .stream()
                                                    
.map(TaskRunnerWorkItem::getTaskId)
                                                    
.map(DurableStorageUtils::getControllerDirectory)
                                                    
.collect(Collectors.toSet());
-            Set<String> unknownDirectories = Sets.difference(allDirectories, 
runningTaskIds);
-            LOG.info(
-                "Following directories do not have a corresponding MSQ task 
associated with it:\n%s\nThese will get cleaned up.",
-                unknownDirectories
-            );
-            for (String unknownDirectory : unknownDirectories) {
-              storageConnector.deleteRecursively(unknownDirectory);
+
+            Set<String> filesToRemove = new HashSet<>();
+            while (allFiles.hasNext()) {
+              String currentFile = allFiles.next();
+              String taskIdFromPathOrEmpty = 
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+              if (taskIdFromPathOrEmpty != null && 
!taskIdFromPathOrEmpty.isEmpty()) {
+                if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
+                  // do nothing
+                } else {
+                  filesToRemove.add(currentFile);
+                }
+              }
+            }
+            if (filesToRemove.isEmpty()) {
+              LOG.info("Nothing to delete");

Review Comment:
   This log can be made more suggestive as to what nothing is.



##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -237,71 +266,95 @@ public OutputStream write(String path) throws IOException
   }
 
   @Override
-  public void deleteFile(String path)
+  public void deleteFile(String path) throws IOException
   {
-    s3Client.deleteObject(config.getBucket(), objectPath(path));
+    try {
+      S3Utils.retryS3Operation(() -> {
+        s3Client.deleteObject(config.getBucket(), objectPath(path));
+        return null;
+      }, config.getMaxRetry());
+    }
+    catch (Exception e) {
+      log.error("Error occurred while deleting file at path [%s]. Error: 
[%s]", path, e.getMessage());
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public void deleteRecursively(String dirName)
+  public void deleteFiles(Iterable<String> paths) throws IOException
   {
-    ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
-        .withBucketName(config.getBucket())
-        .withPrefix(objectPath(dirName));
-    ListObjectsV2Result objectListing = 
s3Client.listObjectsV2(listObjectsRequest);
-
-    while (objectListing.getObjectSummaries().size() > 0) {
-      List<DeleteObjectsRequest.KeyVersion> deleteObjectsRequestKeys = 
objectListing.getObjectSummaries()
-                                                                               
     .stream()
-                                                                               
     .map(S3ObjectSummary::getKey)
-                                                                               
     .map(DeleteObjectsRequest.KeyVersion::new)
-                                                                               
     .collect(Collectors.toList());
-      DeleteObjectsRequest deleteObjectsRequest = new 
DeleteObjectsRequest(config.getBucket()).withKeys(
-          deleteObjectsRequestKeys);
-      s3Client.deleteObjects(deleteObjectsRequest);
+    int currentItemSize = 0;
+    List<DeleteObjectsRequest.KeyVersion> versions = new ArrayList<>();
 
-      // If the listing is truncated, all S3 objects have been deleted, 
otherwise, fetch more using the continuation token
-      if (objectListing.isTruncated()) {
-        
listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
-        objectListing = s3Client.listObjectsV2(listObjectsRequest);
-      } else {
-        break;
+    for (String path : paths) {
+      // appending base path to each path
+      versions.add(new DeleteObjectsRequest.KeyVersion(objectPath(path)));
+      currentItemSize++;
+      if (currentItemSize == MAX_NUMBER_OF_LISTINGS) {
+        deleteKeys(versions);
+        // resetting trackers
+        versions.clear();
+        currentItemSize = 0;
       }
     }
+    // deleting remaining elements
+    if (currentItemSize != 0) {
+      deleteKeys(versions);
+    }
   }
 
-  @Override
-  public List<String> listDir(String dirName)
+  private void deleteKeys(List<DeleteObjectsRequest.KeyVersion> versions) 
throws IOException

Review Comment:
   Should indicate that this is a retriable operation in the method signature. 
Also, similar comment about retrying being a top-level operation than the 
individual batch requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to