This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 036fca0ae5 GH-41862: [C++][S3] Fix potential deadlock when closing
output stream (#41876)
036fca0ae5 is described below
commit 036fca0ae5c8956c83b69478d413c24f32398f8c
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Jun 10 16:47:56 2024 +0200
GH-41862: [C++][S3] Fix potential deadlock when closing output stream
(#41876)
### Rationale for this change
When the Future returned by `OutputStream::CloseAsync` finishes, it can
invoke a user-supplied callback. That callback may well destroy the stream as a
side effect. If the stream is a S3 output stream, this might lead to a deadlock
involving the mutex in the output stream's `UploadState` structure, since the
callback is called with that mutex locked.
### What changes are included in this PR?
Unlock the `UploadState` mutex before marking the Future finished, to avoid
deadlocking.
### Are these changes tested?
No. Unfortunately, I wasn't able to write a test that would trigger the
original condition. Additional preconditions seem to be required to reproduce
the deadlock. For example, it might require a mutex implementation that hangs
if destroyed while locked.
### Are there any user-facing changes?
No.
* GitHub Issue: #41862
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/filesystem/s3fs.cc | 18 +++++++++++++++---
cpp/src/arrow/filesystem/s3fs_test.cc | 31 +++++++++++++++++++++++++++++--
2 files changed, 44 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index c456be2d0d..99cee19ed1 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -1606,6 +1606,10 @@ class ObjectOutputStream final : public io::OutputStream
{
io::internal::CloseFromDestructor(this);
}
+ std::shared_ptr<ObjectOutputStream> Self() {
+ return std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
+ }
+
Status Init() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
@@ -1724,9 +1728,9 @@ class ObjectOutputStream final : public io::OutputStream {
RETURN_NOT_OK(EnsureReadyToFlushFromClose());
- auto self =
std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
// Wait for in-progress uploads to finish (if async writes are enabled)
- return FlushAsync().Then([self]() { return
self->FinishPartUploadAfterFlush(); });
+ return FlushAsync().Then(
+ [self = Self()]() { return self->FinishPartUploadAfterFlush(); });
}
bool closed() const override { return closed_; }
@@ -1892,7 +1896,15 @@ class ObjectOutputStream final : public io::OutputStream
{
}
// Notify completion
if (--state->parts_in_progress == 0) {
- state->pending_parts_completed.MarkFinished(state->status);
+ // GH-41862: avoid potential deadlock if the Future's callback is called
+ // with the mutex taken.
+ auto fut = state->pending_parts_completed;
+ lock.unlock();
+ // State could be mutated concurrently if another thread writes to the
+ // stream, but in this case the Flush() call is only advisory anyway.
+ // Besides, it's not generally sound to write to an OutputStream from
+ // several threads at once.
+ fut.MarkFinished(state->status);
}
}
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 7bfa120eda..5a160a78ce 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -614,9 +614,26 @@ class TestS3FS : public S3TestMixin {
// after CloseAsync or synchronously after stream.reset() since we're just
// checking that `closeAsyncFut` keeps the stream alive until completion
// rather than segfaulting on a dangling stream
- auto closeAsyncFut = stream->CloseAsync();
+ auto close_fut = stream->CloseAsync();
stream.reset();
- ASSERT_OK(closeAsyncFut.MoveResult());
+ ASSERT_OK(close_fut.MoveResult());
+ AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
+ }
+
+ void TestOpenOutputStreamCloseAsyncFutureDeadlock() {
+ // This is inspired by GH-41862, though it fails to reproduce the actual
+ // issue reported there (actual preconditions might be required).
+ // Here we lose our reference to an output stream from its CloseAsync
callback.
+ // This should not deadlock.
+ std::shared_ptr<io::OutputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
+ ASSERT_OK(stream->Write("new data"));
+ auto close_fut = stream->CloseAsync();
+ close_fut.AddCallback([stream = std::move(stream)](Status st) mutable {
+ // Trigger stream destruction from callback
+ stream.reset();
+ });
+ ASSERT_OK(close_fut.MoveResult());
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}
@@ -1254,6 +1271,16 @@ TEST_F(TestS3FS,
OpenOutputStreamAsyncDestructorSyncWrite) {
TestOpenOutputStreamCloseAsyncDestructor();
}
+TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) {
+ TestOpenOutputStreamCloseAsyncFutureDeadlock();
+}
+
+TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) {
+ options_.background_writes = false;
+ MakeFileSystem();
+ TestOpenOutputStreamCloseAsyncFutureDeadlock();
+}
+
TEST_F(TestS3FS, OpenOutputStreamMetadata) {
std::shared_ptr<io::OutputStream> stream;