kou commented on issue #36346:
URL: https://github.com/apache/arrow/issues/36346#issuecomment-1614000816
How about just using `AwsInstance::is_finalized_` instead of introducing
shared-exclusive mutex?
```diff
diff --git a/cpp/src/arrow/filesystem/s3fs.cc
b/cpp/src/arrow/filesystem/s3fs.cc
index c3a6eb0ea..08e4bedea 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -398,12 +398,19 @@ namespace {
Status CheckS3Initialized() {
if (!IsS3Initialized()) {
return Status::Invalid(
- "S3 subsystem not initialized; please call InitializeS3() "
+ "S3 subsystem is not initialized; please call InitializeS3() "
"before carrying out any S3-related operation");
}
return Status::OK();
}
+Status CheckS3Finalized() {
+ if (IsS3Finalized()) {
+ return Status::Invalid("S3 subsystem is finalized");
+ }
+ return Status::OK();
+}
+
// XXX Sanitize paths by removing leading slash?
struct S3Path {
@@ -1008,6 +1015,8 @@ class ObjectInputFile final : public
io::RandomAccessFile {
content_length_(size) {}
Status Init() {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first Read() call.
if (content_length_ != kNoSize) {
@@ -1099,6 +1108,8 @@ class ObjectInputFile final : public
io::RandomAccessFile {
return 0;
}
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Read the desired range of bytes
ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
GetObjectRange(client_.get(), path_, position,
nbytes, out));
@@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public
io::OutputStream {
}
Status Init() {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
@@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::AbortMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
@@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public
io::OutputStream {
Future<> CloseAsync() override {
if (closed_) return Status::OK();
+ RETURN_NOT_OK(CheckS3Finalized());
+
if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
@@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::Invalid("Operation on closed stream");
}
+ RETURN_NOT_OK(CheckS3Finalized());
+
const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
data_ptr += offset;
@@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public
io::OutputStream {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
+ RETURN_NOT_OK(CheckS3Finalized());
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
return upload_state_->pending_parts_completed;
@@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public
io::OutputStream {
// Upload-related helpers
Status CommitCurrentPart() {
+ RETURN_NOT_OK(CheckS3Finalized());
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
@@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public
io::OutputStream {
Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
@@ -1574,6 +1597,8 @@ struct TreeWalker : public
std::enable_shared_from_this<TreeWalker> {
S3Model::ListObjectsV2Request req;
Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Serialize calls to operation-specific handlers
if (!walker->ok()) {
// Early exit: avoid executing handlers if DoWalk() returned
@@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
// Tests to see if a bucket exists
Result<bool> BucketExists(const std::string& bucket) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));
@@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Check bucket exists first.
{
S3Model::HeadBucketRequest req;
@@ -1753,6 +1782,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
// Create an object with empty contents. Successful if object already
exists.
Status CreateEmptyObject(const std::string& bucket, const std::string&
key) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::PutObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
@@ -1768,6 +1799,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
}
Status DeleteObject(const std::string& bucket, const std::string& key) {
+ RETURN_NOT_OK(CheckS3Finalized());
S3Model::DeleteObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
@@ -1777,6 +1809,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
}
Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
+ RETURN_NOT_OK(CheckS3Finalized());
S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
req.SetKey(ToAwsString(dest_path.key));
@@ -1799,6 +1832,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
Result<bool> IsEmptyDirectory(
const std::string& bucket, const std::string& key,
const S3Model::HeadObjectOutcome* previous_outcome = nullptr) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
if (previous_outcome) {
// Fetch the backend from the previous error
DCHECK(!previous_outcome->IsSuccess());
@@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
}
Result<bool> IsNonEmptyDirectory(const S3Path& path) {
+ RETURN_NOT_OK(CheckS3Finalized());
S3Model::ListObjectsV2Request req;
req.SetBucket(ToAwsString(path.bucket));
req.SetPrefix(ToAwsString(path.key) + kSep);
@@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
// Workhorse for GetFileInfo(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
FileInfoCollector collector(bucket, key, select);
auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
@@ -1974,6 +2012,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
// Workhorse for GetFileInfoGenerator(FileSelector...)
FileInfoGenerator WalkAsync(const FileSelector& select, const
std::string& bucket,
const std::string& key) {
+ RETURN_NOT_OK(CheckS3Finalized());
PushGenerator<std::vector<FileInfo>> gen;
auto producer = gen.producer();
auto collector = std::make_shared<FileInfoCollector>(bucket, key,
select);
@@ -2027,6 +2066,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
};
Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const
std::string& bucket,
const
std::string& key) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
auto state = std::make_shared<WalkResult>();
auto handle_results = [state](const std::string& prefix,
@@ -2064,6 +2105,8 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
// Delete multiple objects at once
Future<> DeleteObjectsAsync(const std::string& bucket,
const std::vector<std::string>& keys) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
struct DeleteCallback {
const std::string bucket;
@@ -2156,6 +2199,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
static Result<std::vector<std::string>> ProcessListBuckets(
const Aws::S3::Model::ListBucketsOutcome& outcome) {
+ RETURN_NOT_OK(CheckS3Finalized());
if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
"ListBuckets",
outcome.GetError());
@@ -2169,11 +2213,13 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
}
Result<std::vector<std::string>> ListBuckets() {
+ RETURN_NOT_OK(CheckS3Finalized());
auto outcome = client_->ListBuckets();
return ProcessListBuckets(outcome);
}
Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
+ RETURN_NOT_OK(CheckS3Finalized());
auto self = shared_from_this();
return DeferNotOk(SubmitIO(ctx, [self]() { return
self->client_->ListBuckets(); }))
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
@@ -2187,6 +2233,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
+ RETURN_NOT_OK(CheckS3Finalized());
auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(),
path);
RETURN_NOT_OK(ptr->Init());
@@ -2205,6 +2252,7 @@ class S3FileSystem::Impl : public
std::enable_shared_from_this<S3FileSystem::Imp
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
+ RETURN_NOT_OK(CheckS3Finalized());
auto ptr =
std::make_shared<ObjectInputFile>(client_, fs->io_context(), path,
info.size());
@@ -2223,6 +2271,7 @@ S3FileSystem::~S3FileSystem() {}
Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
const S3Options& options, const io::IOContext& io_context) {
RETURN_NOT_OK(CheckS3Initialized());
+ RETURN_NOT_OK(CheckS3Finalized());
std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
RETURN_NOT_OK(ptr->impl_->Init());
@@ -2250,6 +2299,8 @@ S3Options S3FileSystem::options() const { return
impl_->options(); }
std::string S3FileSystem::region() const { return impl_->region(); }
Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
FileInfo info;
info.set_path(s);
@@ -2313,6 +2364,8 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const
std::string& s) {
}
Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector&
select) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto base_path,
S3Path::FromString(select.base_dir));
FileInfoVector results;
@@ -2338,6 +2391,8 @@ Result<FileInfoVector> S3FileSystem::GetFileInfo(const
FileSelector& select) {
}
FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector&
select) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
auto maybe_base_path = S3Path::FromString(select.base_dir);
if (!maybe_base_path.ok()) {
return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
@@ -2383,6 +2438,8 @@ FileInfoGenerator
S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
}
Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.key.empty()) {
@@ -2426,6 +2483,8 @@ Status S3FileSystem::CreateDir(const std::string& s,
bool recursive) {
}
Status S3FileSystem::DeleteDir(const std::string& s) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.empty()) {
@@ -2455,6 +2514,8 @@ Status S3FileSystem::DeleteDirContents(const
std::string& s, bool missing_dir_ok
}
Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool
missing_dir_ok) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.empty()) {
@@ -2480,6 +2541,8 @@ Status S3FileSystem::DeleteRootDirContents() {
}
Status S3FileSystem::DeleteFile(const std::string& s) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
@@ -2506,6 +2569,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
}
Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// XXX We don't implement moving directories as it would be too expensive:
// one must copy all directory contents one by one (including object
data),
// then delete the original contents.
@@ -2525,6 +2590,8 @@ Status S3FileSystem::Move(const std::string& src,
const std::string& dest) {
}
Status S3FileSystem::CopyFile(const std::string& src, const std::string&
dest) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
RETURN_NOT_OK(ValidateFilePath(src_path));
ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
@@ -2562,6 +2629,8 @@ Result<std::shared_ptr<io::OutputStream>>
S3FileSystem::OpenOutputStream(
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
+ RETURN_NOT_OK(CheckS3Finalized());
+
auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_,
io_context(), path,
impl_->options(),
metadata);
RETURN_NOT_OK(ptr->Init());
@@ -2600,6 +2669,8 @@ struct AwsInstance : public
::arrow::internal::Executor::Resource {
bool IsInitialized() { return !is_finalized_ && is_initialized_; }
+ bool IsFinalized() { return is_finalized_; }
+
void Finalize(bool from_destructor = false) {
bool expected = true;
is_finalized_.store(true);
@@ -2608,9 +2679,9 @@ struct AwsInstance : public
::arrow::internal::Executor::Resource {
ARROW_LOG(WARNING)
<< " arrow::fs::FinalizeS3 was not called even though S3 was
initialized. "
"This could lead to a segmentation fault at exit";
- RegionResolver::ResetDefaultInstance();
- Aws::ShutdownAPI(aws_options_);
}
+ RegionResolver::ResetDefaultInstance();
+ Aws::ShutdownAPI(aws_options_);
}
}
@@ -2672,9 +2743,6 @@ struct AwsInstance : public
::arrow::internal::Executor::Resource {
std::shared_ptr<AwsInstance> CreateAwsInstance() {
auto instance = std::make_shared<AwsInstance>();
- // Don't let S3 be shutdown until all Arrow threads are done using it
- arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
- io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}
@@ -2713,6 +2781,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); }
bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }
+bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); }
+
// -----------------------------------------------------------------------
// Top-level utility functions
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]