LakshSingla commented on code in PR #13960:
URL: https://github.com/apache/druid/pull/13960#discussion_r1144631455
##########
processing/src/main/java/org/apache/druid/storage/StorageConnector.java:
##########
@@ -105,26 +105,39 @@
* with a basePath.
* If the path is a directory, this method throws an exception.
*
- * @param path
- * @throws IOException
+ * @param path to delete
+ * @throws IOException thrown in case of errors.
*/
void deleteFile(String path) throws IOException;
+
+ /**
+ * Delete files present at the input paths. Most implementations prepend all
the input paths
+ * with the basePath.
+ * <br/>
+ * This method is <b>recommended</b> in case we need to delete a batch of
files.
+ * If the path is a directory, this method throws an exception.
+ *
+ * @param paths Iterable of the paths to delete.
+ * @throws IOException thrown in case of errors.
+ */
+ void deleteFiles(Iterable<String> paths) throws IOException;
+
/**
* Delete a directory pointed to by the path and also recursively deletes
all files/directories in said directory.
* Most implementations prepend the input path with a basePath.
*
* @param path path
- * @throws IOException
+ * @throws IOException thrown in case of errors.
*/
void deleteRecursively(String path) throws IOException;
/**
- * Returns a list containing all the files present in the path. The returned
filenames should be such that joining
+ * Returns a lazy iterator containing all the files present in the path. The
returned filenames should be such that joining
* the dirName and the file name form the full path that can be used as the
arguments for other methods of the storage
* connector.
- * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the
filename returned for the path
- * "parent1/parent2" should be "child" and for "parent1" should be "parent2"
+ * For example, for a S3 path such as s3://bucket/parent1/parent2/child, the
filename returned for the input path
+ * "parent1/parent2" should be "child" and for input "parent1" should be
"parent2/child"
*/
- List<String> listDir(String dirName);
+ Iterator<String> listDir(String dirName) throws IOException;
Review Comment:
Instead of Iterator, I think it would be better to get an Iterable and let
the caller fetch the Iterator from the same. The caller might decide wanna
iterate over the returned value again without making the list call.
##########
processing/src/main/java/org/apache/druid/frame/util/DurableStorageUtils.java:
##########
@@ -121,4 +120,14 @@ public static String getOutputsFileNameForPath(
path
);
}
+
+ /**
+ * Tries to parse out the controller taskID from the input path.
+ * <br></br>
+ * For eg: for input path <b>controller_query_id/task/123</b> <br/>the
function will return <b>controller_query_id</b>
+ */
+ public static String getControllerTaskIdWithPrefixFromPath(String path)
+ {
+ return path.split("/", 1)[0];
Review Comment:
There should be some checks with an error message shown: `Invalid path
provided. Cannot extract the controller id from the path [%s] `
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java:
##########
@@ -243,59 +243,73 @@ public static S3ObjectSummary
getSingleObjectSummary(ServerSideEncryptingAmazonS
* Delete the files from S3 in a specified bucket, matching a specified
prefix and filter
*
* @param s3Client s3 client
- * @param config specifies the configuration to use when finding matching
files in S3 to delete
+ * @param maxListingLength maximum number of keys to fetch and delete at a
time
* @param bucket s3 bucket
* @param prefix the file prefix
* @param filter function which returns true if the prefix file found
should be deleted and false otherwise.
*
- * @throws Exception
+ * @throws Exception in case of errors
*/
+
public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
- S3InputDataConfig config,
+ int maxListingLength,
String bucket,
String prefix,
Predicate<S3ObjectSummary> filter
)
throws Exception
{
- final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new
ArrayList<>(config.getMaxListingLength());
+ deleteObjectsInPath(s3Client, maxListingLength, bucket, prefix, filter,
RetryUtils.DEFAULT_MAX_TRIES);
+ }
+
+ public static void deleteObjectsInPath(
+ ServerSideEncryptingAmazonS3 s3Client,
+ int maxListingLength,
+ String bucket,
+ String prefix,
+ Predicate<S3ObjectSummary> filter,
+ int maxRetries
+ )
+ throws Exception
+ {
+ final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new
ArrayList<>(maxListingLength);
final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
s3Client,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
- config.getMaxListingLength()
+ maxListingLength
);
while (iterator.hasNext()) {
final S3ObjectSummary nextObject = iterator.next();
if (filter.apply(nextObject)) {
keysToDelete.add(new
DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
- if (keysToDelete.size() == config.getMaxListingLength()) {
- deleteBucketKeys(s3Client, bucket, keysToDelete);
- log.info("Deleted %d files", keysToDelete.size());
+ if (keysToDelete.size() == maxListingLength) {
+ deleteBucketKeys(s3Client, bucket, keysToDelete, maxRetries);
Review Comment:
Here, for each batch of maxListingLength, we are retrying the operation if
it fails `maxRetries` times. This amounts to a total of `maxListingLength *
maxRetries` retries.
I think it would be better if the `deleteBucketKeys` doesn't retry on it's
own and we have a top level retry mechanism, something like:
```
RetryUtils.retry(
// Body of the function
// deleteBucketKeys(s3Client, bucket, keysToDelete, 1); // Donot retry
the deleteBucketKeys() individually
// Remaining retriable body of the function
)
```
WDYT?
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
Review Comment:
For the functions that have retries added, we should update the Javadocs to
mention that they are retriable `config.getMaxRetry()` amount of times. Or we
should update the class level Javadoc that mentions that all the functions are
retriable.
##########
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java:
##########
@@ -214,21 +237,33 @@ public void pathDeleteRecursively() throws IOException
}
@Test
- public void testListDir()
+ public void testListDir() throws IOException
{
EasyMock.reset(S3_CLIENT, TEST_RESULT);
S3ObjectSummary s3ObjectSummary = new S3ObjectSummary();
s3ObjectSummary.setBucketName(BUCKET);
s3ObjectSummary.setKey(PREFIX + "/test/" + TEST_FILE);
+ s3ObjectSummary.setSize(1);
EasyMock.expect(TEST_RESULT.getObjectSummaries()).andReturn(Collections.singletonList(s3ObjectSummary)).times(2);
EasyMock.expect(TEST_RESULT.isTruncated()).andReturn(false);
+ EasyMock.expect(TEST_RESULT.getNextContinuationToken()).andReturn(null);
EasyMock.expect(S3_CLIENT.listObjectsV2((ListObjectsV2Request)
EasyMock.anyObject()))
.andReturn(TEST_RESULT);
EasyMock.replay(S3_CLIENT, TEST_RESULT);
- List<String> listDirResult = storageConnector.listDir("/");
+ List<String> listDirResult =
Lists.newArrayList(storageConnector.listDir("test/"));
Assert.assertEquals(ImmutableList.of(TEST_FILE), listDirResult);
}
+
+ private String convetDelReqToString(DeleteObjectsRequest
deleteObjectsRequest)
Review Comment:
```suggestion
private String convetDeleteObjectsRequestToString(DeleteObjectsRequest
deleteObjectsRequest)
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
return;
}
TaskRunner taskRunner = taskRunnerOptional.get();
- Set<String> allDirectories = new
HashSet<>(storageConnector.listDir("/"));
+ Iterator<String> allFiles = storageConnector.listDir("");
Set<String> runningTaskIds = taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
- Set<String> unknownDirectories = Sets.difference(allDirectories,
runningTaskIds);
- LOG.info(
- "Following directories do not have a corresponding MSQ task
associated with it:\n%s\nThese will get cleaned up.",
- unknownDirectories
- );
- for (String unknownDirectory : unknownDirectories) {
- storageConnector.deleteRecursively(unknownDirectory);
+
+ Set<String> filesToRemove = new HashSet<>();
+ while (allFiles.hasNext()) {
+ String currentFile = allFiles.next();
+ String taskIdFromPathOrEmpty =
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+ if (taskIdFromPathOrEmpty != null &&
!taskIdFromPathOrEmpty.isEmpty()) {
+ if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
Review Comment:
nit: Can collapse this to something like
```suggestion
if (!runningTaskIds.contains(taskIdFromPathOrEmpty)) {
// filesToRemove.add(currentFile);
}
```
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/DurableStorageCleaner.java:
##########
@@ -93,19 +93,41 @@ public void schedule(ScheduledExecutorService exec)
return;
}
TaskRunner taskRunner = taskRunnerOptional.get();
- Set<String> allDirectories = new
HashSet<>(storageConnector.listDir("/"));
+ Iterator<String> allFiles = storageConnector.listDir("");
Set<String> runningTaskIds = taskRunner.getRunningTasks()
.stream()
.map(TaskRunnerWorkItem::getTaskId)
.map(DurableStorageUtils::getControllerDirectory)
.collect(Collectors.toSet());
- Set<String> unknownDirectories = Sets.difference(allDirectories,
runningTaskIds);
- LOG.info(
- "Following directories do not have a corresponding MSQ task
associated with it:\n%s\nThese will get cleaned up.",
- unknownDirectories
- );
- for (String unknownDirectory : unknownDirectories) {
- storageConnector.deleteRecursively(unknownDirectory);
+
+ Set<String> filesToRemove = new HashSet<>();
+ while (allFiles.hasNext()) {
+ String currentFile = allFiles.next();
+ String taskIdFromPathOrEmpty =
DurableStorageUtils.getControllerTaskIdWithPrefixFromPath(currentFile);
+ if (taskIdFromPathOrEmpty != null &&
!taskIdFromPathOrEmpty.isEmpty()) {
+ if (runningTaskIds.contains(taskIdFromPathOrEmpty)) {
+ // do nothing
+ } else {
+ filesToRemove.add(currentFile);
+ }
+ }
+ }
+ if (filesToRemove.isEmpty()) {
+ LOG.info("Nothing to delete");
Review Comment:
This log can be made more suggestive as to what nothing is.
##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java:
##########
@@ -237,71 +266,95 @@ public OutputStream write(String path) throws IOException
}
@Override
- public void deleteFile(String path)
+ public void deleteFile(String path) throws IOException
{
- s3Client.deleteObject(config.getBucket(), objectPath(path));
+ try {
+ S3Utils.retryS3Operation(() -> {
+ s3Client.deleteObject(config.getBucket(), objectPath(path));
+ return null;
+ }, config.getMaxRetry());
+ }
+ catch (Exception e) {
+ log.error("Error occurred while deleting file at path [%s]. Error:
[%s]", path, e.getMessage());
+ throw new IOException(e);
+ }
}
@Override
- public void deleteRecursively(String dirName)
+ public void deleteFiles(Iterable<String> paths) throws IOException
{
- ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request()
- .withBucketName(config.getBucket())
- .withPrefix(objectPath(dirName));
- ListObjectsV2Result objectListing =
s3Client.listObjectsV2(listObjectsRequest);
-
- while (objectListing.getObjectSummaries().size() > 0) {
- List<DeleteObjectsRequest.KeyVersion> deleteObjectsRequestKeys =
objectListing.getObjectSummaries()
-
.stream()
-
.map(S3ObjectSummary::getKey)
-
.map(DeleteObjectsRequest.KeyVersion::new)
-
.collect(Collectors.toList());
- DeleteObjectsRequest deleteObjectsRequest = new
DeleteObjectsRequest(config.getBucket()).withKeys(
- deleteObjectsRequestKeys);
- s3Client.deleteObjects(deleteObjectsRequest);
+ int currentItemSize = 0;
+ List<DeleteObjectsRequest.KeyVersion> versions = new ArrayList<>();
- // If the listing is truncated, all S3 objects have been deleted,
otherwise, fetch more using the continuation token
- if (objectListing.isTruncated()) {
-
listObjectsRequest.withContinuationToken(objectListing.getContinuationToken());
- objectListing = s3Client.listObjectsV2(listObjectsRequest);
- } else {
- break;
+ for (String path : paths) {
+ // appending base path to each path
+ versions.add(new DeleteObjectsRequest.KeyVersion(objectPath(path)));
+ currentItemSize++;
+ if (currentItemSize == MAX_NUMBER_OF_LISTINGS) {
+ deleteKeys(versions);
+ // resetting trackers
+ versions.clear();
+ currentItemSize = 0;
}
}
+ // deleting remaining elements
+ if (currentItemSize != 0) {
+ deleteKeys(versions);
+ }
}
- @Override
- public List<String> listDir(String dirName)
+ private void deleteKeys(List<DeleteObjectsRequest.KeyVersion> versions)
throws IOException
Review Comment:
Should indicate that this is a retriable operation in the method signature.
Also, similar comment about retrying being a top-level operation than the
individual batch requests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]