westonpace commented on code in PR #35440:
URL: https://github.com/apache/arrow/pull/35440#discussion_r1267379815


##########
cpp/src/arrow/filesystem/s3fs.cc:
##########
@@ -2312,35 +2287,137 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
       futures.push_back(std::move(fut));
     }
 
-    return AllComplete(futures);
+    return AllFinished(futures);
   }
 
   Status DeleteObjects(const std::string& bucket, const 
std::vector<std::string>& keys) {
     return DeleteObjectsAsync(bucket, keys).status();
   }
 
+  Future<> EnsureNotFileAsync(const std::string& bucket, const std::string& 
key) {
+    if (key.empty()) {
+      // There is no way for a bucket to be a file
+      return Future<>::MakeFinished();
+    }
+    auto self = shared_from_this();
+    return DeferNotOk(SubmitIO(io_context_, [self, bucket, key]() mutable -> 
Status {
+      S3Model::HeadObjectRequest req;
+      req.SetBucket(ToAwsString(bucket));
+      req.SetKey(ToAwsString(key));
+
+      ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock());
+      auto outcome = client_lock.Move()->HeadObject(req);
+      if (outcome.IsSuccess()) {
+        const auto& result = outcome.GetResult();
+        if (result.GetContentLength() > 0 || key[key.size() - 1] != '/') {
+          return Status::IOError("Cannot delete directory contents at ", 
bucket, kSep,
+                                 key, " because it is a file");
+        }
+        return Status::OK();
+      }
+      if (IsNotFound(outcome.GetError())) {
+        // Might be ok, let DeleteDirContentsAsync worry about this
+        return Status::OK();
+      } else {
+        return ErrorToStatus(std::forward_as_tuple("When getting information 
for key '",
+                                                   key, "' in bucket '", 
bucket, "': "),
+                             "HeadObject", outcome.GetError());
+      }
+    }));
+  }
+
+  // Some operations require running multiple S3 calls, either in parallel or 
serially. We
+  // need to ensure that the S3 filesystem instance stays valid and that S3 
isn't
+  // finalized.  We do this by wrapping all the tasks in a scheduler which 
keeps the
+  // resources alive
+  Future<> RunInScheduler(
+      std::function<Status(util::AsyncTaskScheduler*, S3FileSystem::Impl*)> 
callable) {
+    auto self = shared_from_this();
+    FnOnce<Status(util::AsyncTaskScheduler*)> initial_task =
+        [callable = std::move(callable),
+         this](util::AsyncTaskScheduler* scheduler) mutable {
+          return callable(scheduler, this);
+        };
+    Future<> scheduler_fut = util::AsyncTaskScheduler::Make(
+        std::move(initial_task),
+        /*abort_callback=*/
+        [](const Status& st) {
+          // No need for special abort logic.
+        },
+        StopToken::Unstoppable());

Review Comment:
   Done.  I also added a test for cancellation to make sure it works correctly. 
 There was one other spot I had to change to get it to pass correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to