bkietz commented on a change in pull request #8818:
URL: https://github.com/apache/arrow/pull/8818#discussion_r537767675
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1080,6 +1082,134 @@ void FileObjectToInfo(const S3Model::Object& obj,
FileInfo* info) {
info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
}
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+ using ResultHandler = std::function<Status(const std::string& prefix,
+ const
S3Model::ListObjectsV2Result&)>;
+ using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+ using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+ Aws::S3::S3Client* client_;
+ const std::string bucket_;
+ const std::string base_dir_;
+ const int32_t max_keys_;
+ const ResultHandler result_handler_;
+ const ErrorHandler error_handler_;
+ const RecursionHandler recursion_handler_;
+
+ template <typename... Args>
+ static Status Walk(Args&&... args) {
+ auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+ return self->DoWalk();
+ }
+
+ TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string
base_dir,
+ int32_t max_keys, ResultHandler result_handler, ErrorHandler
error_handler,
+ RecursionHandler recursion_handler)
+ : client_(std::move(client)),
+ bucket_(std::move(bucket)),
+ base_dir_(std::move(base_dir)),
+ max_keys_(max_keys),
+ result_handler_(std::move(result_handler)),
+ error_handler_(std::move(error_handler)),
+ recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+ std::mutex mutex_;
+ Future<> future_;
+ std::atomic<int32_t> num_in_flight_;
+
+ Status DoWalk() {
+ future_ = decltype(future_)::Make();
+ num_in_flight_ = 0;
+ WalkChild(base_dir_, /*nesting_depth=*/0);
+ // When this returns, ListObjectsV2 tasks either have finished or will
exit early
+ return future_.status();
+ }
+
+ bool is_finished() const { return future_.is_finished(); }
+
+ void ListObjectsFinished(Status st) {
+ const auto in_flight = --num_in_flight_;
+ if (!st.ok() || !in_flight) {
+ future_.MarkFinished(std::move(st));
+ }
+ }
+
+ struct ListObjectsV2Handler {
+ std::shared_ptr<TreeWalker> walker;
+ std::string prefix;
+ int32_t nesting_depth;
+ S3Model::ListObjectsV2Request req;
+
+ void operator()(const Aws::S3::S3Client*, const
S3Model::ListObjectsV2Request&,
+ const S3Model::ListObjectsV2Outcome& outcome,
+ const std::shared_ptr<const
Aws::Client::AsyncCallerContext>&) {
Review comment:
Nit: this would be more readable if you rewrote as
```c++
Result<bool> DoHandle(... args);
void operator()(... args) {
std::unique_lock<std::mutex> guard(walker->mutex_);
auto maybe_truncated = DoHandle(args...);
if (maybe_truncated.ok() && *maybe_truncated) {
DCHECK(!result.GetNextContinuationToken().empty());
req.SetContinuationToken(result.GetNextContinuationToken());
walker->client_->ListObjectsV2Async(req, *this);
return;
}
walker->ListObjectsFinished(std::move(maybetruncated).status());
}
```
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1242,27 +1372,45 @@ class S3FileSystem::Impl {
// Recursive workhorse for GetTargetStats(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
- int32_t nesting_depth = 0;
- return Walk(select, bucket, key, nesting_depth, out);
- }
+ bool is_empty = true;
- Status Walk(const FileSelector& select, const std::string& bucket,
- const std::string& key, int32_t nesting_depth,
std::vector<FileInfo>* out) {
- if (nesting_depth >= kMaxNestingDepth) {
- return Status::IOError("S3 filesystem tree exceeds maximum nesting depth
(",
- kMaxNestingDepth, ")");
- }
+ auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
+ if (select.allow_not_found && IsNotFound(error)) {
+ return Status::OK();
+ }
+ return ErrorToStatus(std::forward_as_tuple("When listing objects under
key '", key,
+ "' in bucket '", bucket, "':
"),
+ error);
+ };
- bool is_empty = true;
- std::vector<std::string> child_keys;
+ auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+ if (nesting_depth >= kMaxNestingDepth) {
+ return Status::IOError("S3 filesystem tree exceeds maximum nesting
depth (",
+ kMaxNestingDepth, ")");
+ }
Review comment:
since this is common to any recursive walk, should it be inlined in
ListObjectsV2Handler::DoHandle or so?
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1080,6 +1082,134 @@ void FileObjectToInfo(const S3Model::Object& obj,
FileInfo* info) {
info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
}
+struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
+ using ResultHandler = std::function<Status(const std::string& prefix,
+ const
S3Model::ListObjectsV2Result&)>;
+ using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
+ using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
+
+ Aws::S3::S3Client* client_;
+ const std::string bucket_;
+ const std::string base_dir_;
+ const int32_t max_keys_;
+ const ResultHandler result_handler_;
+ const ErrorHandler error_handler_;
+ const RecursionHandler recursion_handler_;
+
+ template <typename... Args>
+ static Status Walk(Args&&... args) {
+ auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
+ return self->DoWalk();
+ }
+
+ TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string
base_dir,
+ int32_t max_keys, ResultHandler result_handler, ErrorHandler
error_handler,
+ RecursionHandler recursion_handler)
+ : client_(std::move(client)),
+ bucket_(std::move(bucket)),
+ base_dir_(std::move(base_dir)),
+ max_keys_(max_keys),
+ result_handler_(std::move(result_handler)),
+ error_handler_(std::move(error_handler)),
+ recursion_handler_(std::move(recursion_handler)) {}
+
+ private:
+ std::mutex mutex_;
+ Future<> future_;
+ std::atomic<int32_t> num_in_flight_;
+
+ Status DoWalk() {
+ future_ = decltype(future_)::Make();
+ num_in_flight_ = 0;
+ WalkChild(base_dir_, /*nesting_depth=*/0);
+ // When this returns, ListObjectsV2 tasks either have finished or will
exit early
+ return future_.status();
+ }
+
+ bool is_finished() const { return future_.is_finished(); }
+
+ void ListObjectsFinished(Status st) {
+ const auto in_flight = --num_in_flight_;
+ if (!st.ok() || !in_flight) {
+ future_.MarkFinished(std::move(st));
+ }
+ }
+
+ struct ListObjectsV2Handler {
+ std::shared_ptr<TreeWalker> walker;
+ std::string prefix;
+ int32_t nesting_depth;
+ S3Model::ListObjectsV2Request req;
+
+ void operator()(const Aws::S3::S3Client*, const
S3Model::ListObjectsV2Request&,
+ const S3Model::ListObjectsV2Outcome& outcome,
+ const std::shared_ptr<const
Aws::Client::AsyncCallerContext>&) {
+ // Serialize calls to operation-specific handlers
+ std::unique_lock<std::mutex> guard(walker->mutex_);
+ if (walker->is_finished()) {
+ // Early exit: avoid executing handlers if DoWalk() returned
+ return;
+ }
+ if (!outcome.IsSuccess()) {
+ Status st = walker->error_handler_(outcome.GetError());
+ walker->ListObjectsFinished(std::move(st));
+ return;
+ }
+ const auto& result = outcome.GetResult();
+ bool recurse = result.GetCommonPrefixes().size() > 0;
+ if (recurse) {
+ auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
+ if (!maybe_recurse.ok()) {
+ walker->ListObjectsFinished(maybe_recurse.status());
+ return;
+ }
+ recurse &= *maybe_recurse;
+ }
+ Status st = walker->result_handler_(prefix, result);
+ if (!st.ok()) {
+ walker->ListObjectsFinished(std::move(st));
+ return;
+ }
+ if (recurse) {
+ walker->WalkChildren(result, nesting_depth + 1);
+ }
+ if (result.GetIsTruncated()) {
+ DCHECK(!result.GetNextContinuationToken().empty());
+ req.SetContinuationToken(result.GetNextContinuationToken());
+ walker->client_->ListObjectsV2Async(req, *this);
+ } else {
+ walker->ListObjectsFinished(Status::OK());
+ }
+ }
+
+ void Start() {
+ req.SetBucket(ToAwsString(walker->bucket_));
+ if (!prefix.empty()) {
+ req.SetPrefix(ToAwsString(prefix) + kSep);
+ }
+ req.SetDelimiter(Aws::String() + kSep);
+ req.SetMaxKeys(walker->max_keys_);
+ walker->client_->ListObjectsV2Async(req, *this);
+ }
+ };
+
+ void WalkChild(std::string key, int32_t nesting_depth) {
+ ListObjectsV2Handler handler{shared_from_this(), std::move(key),
nesting_depth, {}};
+ ++num_in_flight_;
+ handler.Start();
Review comment:
Nit: having a Start method on a handler is counter intuitive. I'd prefer
it be inlined here and `req` not be a member. (It's referenced in
`ListObjectsV2Handler::operator()` but it still doesn't need to be a member
since the same ListObjectsV2Request is passed as an argument)
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1317,36 +1434,24 @@ class S3FileSystem::Impl {
return PathNotFound(bucket, key);
}
}
+ // TODO sort results for convenience?
Review comment:
I think we probably should, if only to establish consistency for testing
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1379,23 +1520,16 @@ class S3FileSystem::Impl {
}
req.SetBucket(ToAwsString(bucket));
req.SetDelete(std::move(del));
- auto outcome = client_->DeleteObjects(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(outcome.GetError());
- }
- // Also need to check per-key errors, even on successful outcome
- // See
- //
https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
- const auto& errors = outcome.GetResult().GetErrors();
- if (!errors.empty()) {
- std::stringstream ss;
- ss << "Got the following " << errors.size()
- << " errors when deleting objects in S3 bucket '" << bucket <<
"':\n";
- for (const auto& error : errors) {
- ss << "- key '" << error.GetKey() << "': " << error.GetMessage() <<
"\n";
- }
- return Status::IOError(ss.str());
- }
+ deleters.emplace_back();
+ client_->DeleteObjectsAsync(req, deleters.back());
+ }
+
+ std::vector<Future<>*> futures(deleters.size());
+ std::transform(deleters.begin(), deleters.end(), futures.begin(),
+ [](Deleter& del) { return &del.future; });
Review comment:
```suggestion
futures.push_back(&deleters.back().future);
}
```
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1357,20 +1462,56 @@ class S3FileSystem::Impl {
error);
};
- RETURN_NOT_OK(
- ListObjectsV2(bucket, key, std::move(handle_results),
std::move(handle_error)));
+ auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
+ if (nesting_depth >= kMaxNestingDepth) {
+ return Status::IOError("S3 filesystem tree exceeds maximum nesting
depth (",
+ kMaxNestingDepth, ")");
+ }
+ return true; // Recurse
+ };
- // Recurse
- for (const auto& child_key : child_keys) {
- RETURN_NOT_OK(
- WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys,
dir_keys));
- }
- return Status::OK();
+ return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
+ handle_results, handle_error, handle_recursion);
}
// Delete multiple objects at once
Status DeleteObjects(const std::string& bucket, const
std::vector<std::string>& keys) {
+ struct Deleter {
+ Future<> future;
+
+ Deleter() : future(Future<>::Make()) {}
Review comment:
```suggestion
struct DeleteHandler {
Future<> future = Future<>::Make();
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]