westonpace commented on code in PR #35440:
URL: https://github.com/apache/arrow/pull/35440#discussion_r1267379815
##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2312,35 +2287,137 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
futures.push_back(std::move(fut));
}
- return AllComplete(futures);
+ return AllFinished(futures);
}
Status DeleteObjects(const std::string& bucket, const
std::vector<std::string>& keys) {
return DeleteObjectsAsync(bucket, keys).status();
}
+ Future<> EnsureNotFileAsync(const std::string& bucket, const std::string&
key) {
+ if (key.empty()) {
+ // There is no way for a bucket to be a file
+ return Future<>::MakeFinished();
+ }
+ auto self = shared_from_this();
+ return DeferNotOk(SubmitIO(io_context_, [self, bucket, key]() mutable ->
Status {
+ S3Model::HeadObjectRequest req;
+ req.SetBucket(ToAwsString(bucket));
+ req.SetKey(ToAwsString(key));
+
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());
+ auto outcome = client_lock.Move()->HeadObject(req);
+ if (outcome.IsSuccess()) {
+ const auto& result = outcome.GetResult();
+ if (result.GetContentLength() > 0 || key[key.size() - 1] != '/') {
+ return Status::IOError("Cannot delete directory contents at ",
bucket, kSep,
+ key, " because it is a file");
+ }
+ return Status::OK();
+ }
+ if (IsNotFound(outcome.GetError())) {
+ // Might be ok, let DeleteDirContentsAsync worry about this
+ return Status::OK();
+ } else {
+ return ErrorToStatus(std::forward_as_tuple("When getting information
for key '",
+ key, "' in bucket '",
bucket, "': "),
+ "HeadObject", outcome.GetError());
+ }
+ }));
+ }
+
+ // Some operations require running multiple S3 calls, either in parallel or
serially. We
+ // need to ensure that the S3 filesystem instance stays valid and that S3
isn't
+ // finalized. We do this by wrapping all the tasks in a scheduler which
keeps the
+ // resources alive
+ Future<> RunInScheduler(
+ std::function<Status(util::AsyncTaskScheduler*, S3FileSystem::Impl*)>
callable) {
+ auto self = shared_from_this();
+ FnOnce<Status(util::AsyncTaskScheduler*)> initial_task =
+ [callable = std::move(callable),
+ this](util::AsyncTaskScheduler* scheduler) mutable {
+ return callable(scheduler, this);
+ };
+ Future<> scheduler_fut = util::AsyncTaskScheduler::Make(
+ std::move(initial_task),
+ /*abort_callback=*/
+ [](const Status& st) {
+ // No need for special abort logic.
+ },
+ StopToken::Unstoppable());
Review Comment:
Done. I also added a test for cancellation to make sure it works correctly.
There was one other spot I had to change to get it to pass correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]