kou commented on issue #36346:
URL: https://github.com/apache/arrow/issues/36346#issuecomment-1614000816

   How about just using `AwsInstance::is_finalized_` instead of introducing 
shared-exclusive mutex?
   
   ```diff
   diff --git a/cpp/src/arrow/filesystem/s3fs.cc 
b/cpp/src/arrow/filesystem/s3fs.cc
   index c3a6eb0ea..08e4bedea 100644
   --- a/cpp/src/arrow/filesystem/s3fs.cc
   +++ b/cpp/src/arrow/filesystem/s3fs.cc
   @@ -398,12 +398,19 @@ namespace {
    Status CheckS3Initialized() {
      if (!IsS3Initialized()) {
        return Status::Invalid(
   -        "S3 subsystem not initialized; please call InitializeS3() "
   +        "S3 subsystem is not initialized; please call InitializeS3() "
            "before carrying out any S3-related operation");
      }
      return Status::OK();
    }
    
   +Status CheckS3Finalized() {
   +  if (IsS3Finalized()) {
   +    return Status::Invalid("S3 subsystem is finalized");
   +  }
   +  return Status::OK();
   +}
   +
    // XXX Sanitize paths by removing leading slash?
    
    struct S3Path {
   @@ -1008,6 +1015,8 @@ class ObjectInputFile final : public 
io::RandomAccessFile {
            content_length_(size) {}
    
      Status Init() {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        // Issue a HEAD Object to get the content-length and ensure any
        // errors (e.g. file not found) don't wait until the first Read() call.
        if (content_length_ != kNoSize) {
   @@ -1099,6 +1108,8 @@ class ObjectInputFile final : public 
io::RandomAccessFile {
          return 0;
        }
    
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        // Read the desired range of bytes
        ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
                              GetObjectRange(client_.get(), path_, position, 
nbytes, out));
   @@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public 
io::OutputStream {
      }
    
      Status Init() {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        // Initiate the multi-part upload
        S3Model::CreateMultipartUploadRequest req;
        req.SetBucket(ToAwsString(path_.bucket));
   @@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public 
io::OutputStream {
          return Status::OK();
        }
    
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        S3Model::AbortMultipartUploadRequest req;
        req.SetBucket(ToAwsString(path_.bucket));
        req.SetKey(ToAwsString(path_.key));
   @@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public 
io::OutputStream {
      Future<> CloseAsync() override {
        if (closed_) return Status::OK();
    
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        if (current_part_) {
          // Upload last part
          RETURN_NOT_OK(CommitCurrentPart());
   @@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public 
io::OutputStream {
          return Status::Invalid("Operation on closed stream");
        }
    
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
        auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
          data_ptr += offset;
   @@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public 
io::OutputStream {
        if (closed_) {
          return Status::Invalid("Operation on closed stream");
        }
   +    RETURN_NOT_OK(CheckS3Finalized());
        // Wait for background writes to finish
        std::unique_lock<std::mutex> lock(upload_state_->mutex);
        return upload_state_->pending_parts_completed;
   @@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public 
io::OutputStream {
      // Upload-related helpers
    
      Status CommitCurrentPart() {
   +    RETURN_NOT_OK(CheckS3Finalized());
        ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
        current_part_.reset();
        current_part_size_ = 0;
   @@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public 
io::OutputStream {
    
      Status UploadPart(const void* data, int64_t nbytes,
                        std::shared_ptr<Buffer> owned_buffer = nullptr) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        S3Model::UploadPartRequest req;
        req.SetBucket(ToAwsString(path_.bucket));
        req.SetKey(ToAwsString(path_.key));
   @@ -1574,6 +1597,8 @@ struct TreeWalker : public 
std::enable_shared_from_this<TreeWalker> {
        S3Model::ListObjectsV2Request req;
    
        Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
   +      RETURN_NOT_OK(CheckS3Finalized());
   +
          // Serialize calls to operation-specific handlers
          if (!walker->ok()) {
            // Early exit: avoid executing handlers if DoWalk() returned
   @@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
    
      // Tests to see if a bucket exists
      Result<bool> BucketExists(const std::string& bucket) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        S3Model::HeadBucketRequest req;
        req.SetBucket(ToAwsString(bucket));
    
   @@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
    
      // Create a bucket.  Successful if bucket already exists.
      Status CreateBucket(const std::string& bucket) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        // Check bucket exists first.
        {
          S3Model::HeadBucketRequest req;
   @@ -1753,6 +1782,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
    
      // Create an object with empty contents.  Successful if object already 
exists.
      Status CreateEmptyObject(const std::string& bucket, const std::string& 
key) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        S3Model::PutObjectRequest req;
        req.SetBucket(ToAwsString(bucket));
        req.SetKey(ToAwsString(key));
   @@ -1768,6 +1799,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      }
    
      Status DeleteObject(const std::string& bucket, const std::string& key) {
   +    RETURN_NOT_OK(CheckS3Finalized());
        S3Model::DeleteObjectRequest req;
        req.SetBucket(ToAwsString(bucket));
        req.SetKey(ToAwsString(key));
   @@ -1777,6 +1809,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      }
    
      Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
   +    RETURN_NOT_OK(CheckS3Finalized());
        S3Model::CopyObjectRequest req;
        req.SetBucket(ToAwsString(dest_path.bucket));
        req.SetKey(ToAwsString(dest_path.key));
   @@ -1799,6 +1832,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      Result<bool> IsEmptyDirectory(
          const std::string& bucket, const std::string& key,
          const S3Model::HeadObjectOutcome* previous_outcome = nullptr) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        if (previous_outcome) {
          // Fetch the backend from the previous error
          DCHECK(!previous_outcome->IsSuccess());
   @@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      }
    
      Result<bool> IsNonEmptyDirectory(const S3Path& path) {
   +    RETURN_NOT_OK(CheckS3Finalized());
        S3Model::ListObjectsV2Request req;
        req.SetBucket(ToAwsString(path.bucket));
        req.SetPrefix(ToAwsString(path.key) + kSep);
   @@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      // Workhorse for GetFileInfo(FileSelector...)
      Status Walk(const FileSelector& select, const std::string& bucket,
                  const std::string& key, std::vector<FileInfo>* out) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        FileInfoCollector collector(bucket, key, select);
    
        auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
   @@ -1974,6 +2012,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      // Workhorse for GetFileInfoGenerator(FileSelector...)
      FileInfoGenerator WalkAsync(const FileSelector& select, const 
std::string& bucket,
                                  const std::string& key) {
   +    RETURN_NOT_OK(CheckS3Finalized());
        PushGenerator<std::vector<FileInfo>> gen;
        auto producer = gen.producer();
        auto collector = std::make_shared<FileInfoCollector>(bucket, key, 
select);
   @@ -2027,6 +2066,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      };
      Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const 
std::string& bucket,
                                                                const 
std::string& key) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        auto state = std::make_shared<WalkResult>();
    
        auto handle_results = [state](const std::string& prefix,
   @@ -2064,6 +2105,8 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      // Delete multiple objects at once
      Future<> DeleteObjectsAsync(const std::string& bucket,
                                  const std::vector<std::string>& keys) {
   +    RETURN_NOT_OK(CheckS3Finalized());
   +
        struct DeleteCallback {
          const std::string bucket;
    
   @@ -2156,6 +2199,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
    
      static Result<std::vector<std::string>> ProcessListBuckets(
          const Aws::S3::Model::ListBucketsOutcome& outcome) {
   +    RETURN_NOT_OK(CheckS3Finalized());
        if (!outcome.IsSuccess()) {
          return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), 
"ListBuckets",
                               outcome.GetError());
   @@ -2169,11 +2213,13 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
      }
    
      Result<std::vector<std::string>> ListBuckets() {
   +    RETURN_NOT_OK(CheckS3Finalized());
        auto outcome = client_->ListBuckets();
        return ProcessListBuckets(outcome);
      }
    
      Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
   +    RETURN_NOT_OK(CheckS3Finalized());
        auto self = shared_from_this();
        return DeferNotOk(SubmitIO(ctx, [self]() { return 
self->client_->ListBuckets(); }))
            // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
   @@ -2187,6 +2233,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
        ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
        ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
        RETURN_NOT_OK(ValidateFilePath(path));
   +    RETURN_NOT_OK(CheckS3Finalized());
    
        auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), 
path);
        RETURN_NOT_OK(ptr->Init());
   @@ -2205,6 +2252,7 @@ class S3FileSystem::Impl : public 
std::enable_shared_from_this<S3FileSystem::Imp
    
        ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
        RETURN_NOT_OK(ValidateFilePath(path));
   +    RETURN_NOT_OK(CheckS3Finalized());
    
        auto ptr =
            std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, 
info.size());
   @@ -2223,6 +2271,7 @@ S3FileSystem::~S3FileSystem() {}
    Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
        const S3Options& options, const io::IOContext& io_context) {
      RETURN_NOT_OK(CheckS3Initialized());
   +  RETURN_NOT_OK(CheckS3Finalized());
    
      std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
      RETURN_NOT_OK(ptr->impl_->Init());
   @@ -2250,6 +2299,8 @@ S3Options S3FileSystem::options() const { return 
impl_->options(); }
    std::string S3FileSystem::region() const { return impl_->region(); }
    
    Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
      FileInfo info;
      info.set_path(s);
   @@ -2313,6 +2364,8 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const 
std::string& s) {
    }
    
    Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& 
select) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto base_path, 
S3Path::FromString(select.base_dir));
    
      FileInfoVector results;
   @@ -2338,6 +2391,8 @@ Result<FileInfoVector> S3FileSystem::GetFileInfo(const 
FileSelector& select) {
    }
    
    FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& 
select) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      auto maybe_base_path = S3Path::FromString(select.base_dir);
      if (!maybe_base_path.ok()) {
        return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
   @@ -2383,6 +2438,8 @@ FileInfoGenerator 
S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
    }
    
    Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
    
      if (path.key.empty()) {
   @@ -2426,6 +2483,8 @@ Status S3FileSystem::CreateDir(const std::string& s, 
bool recursive) {
    }
    
    Status S3FileSystem::DeleteDir(const std::string& s) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
    
      if (path.empty()) {
   @@ -2455,6 +2514,8 @@ Status S3FileSystem::DeleteDirContents(const 
std::string& s, bool missing_dir_ok
    }
    
    Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool 
missing_dir_ok) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
    
      if (path.empty()) {
   @@ -2480,6 +2541,8 @@ Status S3FileSystem::DeleteRootDirContents() {
    }
    
    Status S3FileSystem::DeleteFile(const std::string& s) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
      RETURN_NOT_OK(ValidateFilePath(path));
    
   @@ -2506,6 +2569,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
    }
    
    Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      // XXX We don't implement moving directories as it would be too expensive:
      // one must copy all directory contents one by one (including object 
data),
      // then delete the original contents.
   @@ -2525,6 +2590,8 @@ Status S3FileSystem::Move(const std::string& src, 
const std::string& dest) {
    }
    
    Status S3FileSystem::CopyFile(const std::string& src, const std::string& 
dest) {
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
      RETURN_NOT_OK(ValidateFilePath(src_path));
      ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
   @@ -2562,6 +2629,8 @@ Result<std::shared_ptr<io::OutputStream>> 
S3FileSystem::OpenOutputStream(
      ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
      RETURN_NOT_OK(ValidateFilePath(path));
    
   +  RETURN_NOT_OK(CheckS3Finalized());
   +
      auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, 
io_context(), path,
                                                      impl_->options(), 
metadata);
      RETURN_NOT_OK(ptr->Init());
   @@ -2600,6 +2669,8 @@ struct AwsInstance : public 
::arrow::internal::Executor::Resource {
    
      bool IsInitialized() { return !is_finalized_ && is_initialized_; }
    
   +  bool IsFinalized() { return is_finalized_; }
   +
      void Finalize(bool from_destructor = false) {
        bool expected = true;
        is_finalized_.store(true);
   @@ -2608,9 +2679,9 @@ struct AwsInstance : public 
::arrow::internal::Executor::Resource {
            ARROW_LOG(WARNING)
                << " arrow::fs::FinalizeS3 was not called even though S3 was 
initialized.  "
                   "This could lead to a segmentation fault at exit";
   -        RegionResolver::ResetDefaultInstance();
   -        Aws::ShutdownAPI(aws_options_);
          }
   +      RegionResolver::ResetDefaultInstance();
   +      Aws::ShutdownAPI(aws_options_);
        }
      }
    
   @@ -2672,9 +2743,6 @@ struct AwsInstance : public 
::arrow::internal::Executor::Resource {
    
    std::shared_ptr<AwsInstance> CreateAwsInstance() {
      auto instance = std::make_shared<AwsInstance>();
   -  // Don't let S3 be shutdown until all Arrow threads are done using it
   -  arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
   -  io::internal::GetIOThreadPool()->KeepAlive(instance);
      return instance;
    }
    
   @@ -2713,6 +2781,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); }
    
    bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }
    
   +bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); }
   +
    // -----------------------------------------------------------------------
    // Top-level utility functions
    
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to