This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 036fca0ae5 GH-41862: [C++][S3] Fix potential deadlock when closing 
output stream (#41876)
036fca0ae5 is described below

commit 036fca0ae5c8956c83b69478d413c24f32398f8c
Author: Antoine Pitrou <[email protected]>
AuthorDate: Mon Jun 10 16:47:56 2024 +0200

    GH-41862: [C++][S3] Fix potential deadlock when closing output stream 
(#41876)
    
    ### Rationale for this change
    
    When the Future returned by `OutputStream::CloseAsync` finishes, it can 
invoke a user-supplied callback. That callback may well destroy the stream as a 
side effect. If the stream is a S3 output stream, this might lead to a deadlock 
involving the mutex in the output stream's `UploadState` structure, since the 
callback is called with that mutex locked.
    
    ### What changes are included in this PR?
    
    Unlock the `UploadState` mutex before marking the Future finished, to avoid 
deadlocking.
    
    ### Are these changes tested?
    
    No. Unfortunately, I wasn't able to write a test that would trigger the 
original condition. Additional preconditions seem to be required to reproduce 
the deadlock. For example, it might require a mutex implementation that hangs 
if destroyed while locked.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #41862
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/filesystem/s3fs.cc      | 18 +++++++++++++++---
 cpp/src/arrow/filesystem/s3fs_test.cc | 31 +++++++++++++++++++++++++++++--
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index c456be2d0d..99cee19ed1 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -1606,6 +1606,10 @@ class ObjectOutputStream final : public io::OutputStream 
{
     io::internal::CloseFromDestructor(this);
   }
 
+  std::shared_ptr<ObjectOutputStream> Self() {
+    return std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
+  }
+
   Status Init() {
     ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
 
@@ -1724,9 +1728,9 @@ class ObjectOutputStream final : public io::OutputStream {
 
     RETURN_NOT_OK(EnsureReadyToFlushFromClose());
 
-    auto self = 
std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
     // Wait for in-progress uploads to finish (if async writes are enabled)
-    return FlushAsync().Then([self]() { return 
self->FinishPartUploadAfterFlush(); });
+    return FlushAsync().Then(
+        [self = Self()]() { return self->FinishPartUploadAfterFlush(); });
   }
 
   bool closed() const override { return closed_; }
@@ -1892,7 +1896,15 @@ class ObjectOutputStream final : public io::OutputStream 
{
     }
     // Notify completion
     if (--state->parts_in_progress == 0) {
-      state->pending_parts_completed.MarkFinished(state->status);
+      // GH-41862: avoid potential deadlock if the Future's callback is called
+      // with the mutex taken.
+      auto fut = state->pending_parts_completed;
+      lock.unlock();
+      // State could be mutated concurrently if another thread writes to the
+      // stream, but in this case the Flush() call is only advisory anyway.
+      // Besides, it's not generally sound to write to an OutputStream from
+      // several threads at once.
+      fut.MarkFinished(state->status);
     }
   }
 
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc 
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 7bfa120eda..5a160a78ce 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -614,9 +614,26 @@ class TestS3FS : public S3TestMixin {
     // after CloseAsync or synchronously after stream.reset() since we're just
     // checking that `closeAsyncFut` keeps the stream alive until completion
     // rather than segfaulting on a dangling stream
-    auto closeAsyncFut = stream->CloseAsync();
+    auto close_fut = stream->CloseAsync();
     stream.reset();
-    ASSERT_OK(closeAsyncFut.MoveResult());
+    ASSERT_OK(close_fut.MoveResult());
+    AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
+  }
+
+  void TestOpenOutputStreamCloseAsyncFutureDeadlock() {
+    // This is inspired by GH-41862, though it fails to reproduce the actual
+    // issue reported there (actual preconditions might be required).
+    // Here we lose our reference to an output stream from its CloseAsync 
callback.
+    // This should not deadlock.
+    std::shared_ptr<io::OutputStream> stream;
+    ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
+    ASSERT_OK(stream->Write("new data"));
+    auto close_fut = stream->CloseAsync();
+    close_fut.AddCallback([stream = std::move(stream)](Status st) mutable {
+      // Trigger stream destruction from callback
+      stream.reset();
+    });
+    ASSERT_OK(close_fut.MoveResult());
     AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
   }
 
@@ -1254,6 +1271,16 @@ TEST_F(TestS3FS, 
OpenOutputStreamAsyncDestructorSyncWrite) {
   TestOpenOutputStreamCloseAsyncDestructor();
 }
 
+TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) {
+  TestOpenOutputStreamCloseAsyncFutureDeadlock();
+}
+
+TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) {
+  options_.background_writes = false;
+  MakeFileSystem();
+  TestOpenOutputStreamCloseAsyncFutureDeadlock();
+}
+
 TEST_F(TestS3FS, OpenOutputStreamMetadata) {
   std::shared_ptr<io::OutputStream> stream;
 

Reply via email to