This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new f7286a9530 GH-38884: [C++] DatasetWriter release 
rows_in_flight_throttle when allocate writing failed (#38885)
f7286a9530 is described below

commit f7286a95304a848033d96817f704ace0765c0ec7
Author: mwish <[email protected]>
AuthorDate: Thu Dec 7 16:21:14 2023 +0800

    GH-38884: [C++] DatasetWriter release rows_in_flight_throttle when allocate 
writing failed (#38885)
    
    
    
    ### Rationale for this change
    
    When file-queue is fall or write failed, the 
`DatasetWriterImpl::DoWriteRecordBatch` might failed, however, the resources 
are not released.
    
    ### What changes are included in this PR?
    
    When file-queue is full or cannot open file, release the `row` resources.
    
    ### Are these changes tested?
    
    yes
    
    ### Are there any user-facing changes?
    
    no
    
    * Closes: #38884
    
    Authored-by: mwish <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/dataset/dataset_writer.cc      | 15 ++++++++++++--
 cpp/src/arrow/dataset/dataset_writer_test.cc | 31 ++++++++++++++++++++++++++--
 2 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/cpp/src/arrow/dataset/dataset_writer.cc 
b/cpp/src/arrow/dataset/dataset_writer.cc
index a2096d691b..ae9fb36484 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -87,7 +87,7 @@ class Throttle {
 
  private:
   Future<> backpressure_ = Future<>::MakeFinished();
-  uint64_t max_value_;
+  const uint64_t max_value_;
   uint64_t in_waiting_ = 0;
   uint64_t current_value_ = 0;
   std::mutex mutex_;
@@ -621,11 +621,21 @@ class DatasetWriter::DatasetWriterImpl {
         backpressure = writer_state_.open_files_throttle.Acquire(1);
         if (!backpressure.is_finished()) {
           
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
+          
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
           RETURN_NOT_OK(TryCloseLargestFile());
           break;
         }
       }
-      RETURN_NOT_OK(dir_queue->StartWrite(next_chunk));
+      auto s = dir_queue->StartWrite(next_chunk);
+      if (!s.ok()) {
+        // If `StartWrite` succeeded, it will Release the
+        // `rows_in_flight_throttle` when the write task is finished.
+        //
+        // `open_files_throttle` will be handed by 
`DatasetWriterDirectoryQueue`
+        // so we don't need to release it here.
+        writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
+        return s;
+      }
       batch = std::move(remainder);
       if (batch) {
         RETURN_NOT_OK(dir_queue->FinishCurrentFile());
@@ -647,6 +657,7 @@ class DatasetWriter::DatasetWriterImpl {
   DatasetWriterState writer_state_;
   std::function<void()> pause_callback_;
   std::function<void()> resume_callback_;
+  // Map from directory + prefix to the queue for that directory
   std::unordered_map<std::string, std::shared_ptr<DatasetWriterDirectoryQueue>>
       directory_queues_;
   std::mutex mutex_;
diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc 
b/cpp/src/arrow/dataset/dataset_writer_test.cc
index c76e79d79b..e62e779f71 100644
--- a/cpp/src/arrow/dataset/dataset_writer_test.cc
+++ b/cpp/src/arrow/dataset/dataset_writer_test.cc
@@ -189,7 +189,8 @@ class DatasetWriterTestFixture : public testing::Test {
     }
   }
 
-  void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
+  void AssertCreatedData(const std::vector<ExpectedFile>& expected_files,
+                         bool check_num_record_batches = true) {
     counter_ = 0;
     for (const auto& expected_file : expected_files) {
       std::optional<MockFileInfo> written_file = 
FindFile(expected_file.filename);
@@ -197,7 +198,9 @@ class DatasetWriterTestFixture : public testing::Test {
       int num_batches = 0;
       AssertBatchesEqual(*MakeBatch(expected_file.start, 
expected_file.num_rows),
                          *ReadAsBatch(written_file->data, &num_batches));
-      ASSERT_EQ(expected_file.num_record_batches, num_batches);
+      if (check_num_record_batches) {
+        ASSERT_EQ(expected_file.num_record_batches, num_batches);
+      }
     }
   }
 
@@ -277,6 +280,30 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
                      {"testdir/chunk-3.arrow", 30, 5}});
 }
 
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) {
+  // GH-38884: This test is to make sure that the writer can handle
+  //  throttle resources in `WriteRecordBatch`.
+
+  constexpr auto kFileSizeLimit = static_cast<uint64_t>(10);
+  write_options_.max_rows_per_file = kFileSizeLimit;
+  write_options_.max_rows_per_group = kFileSizeLimit;
+  write_options_.max_open_files = 2;
+  write_options_.min_rows_per_group = kFileSizeLimit - 1;
+  auto dataset_writer = MakeDatasetWriter(/*max_rows=*/kFileSizeLimit);
+  for (int i = 0; i < 20; ++i) {
+    dataset_writer->WriteRecordBatch(MakeBatch(kFileSizeLimit * 5), "");
+  }
+  EndWriterChecked(dataset_writer.get());
+  std::vector<ExpectedFile> expected_files;
+  for (int i = 0; i < 100; ++i) {
+    expected_files.emplace_back("testdir/chunk-" + std::to_string(i) + 
".arrow",
+                                kFileSizeLimit * i, kFileSizeLimit);
+  }
+  // Not checking the number of record batches because file may contain the
+  // zero-length record batch.
+  AssertCreatedData(expected_files, /*check_num_record_batches=*/false);
+}
+
 TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
   // Left padding with up to four zeros
   write_options_.max_rows_per_group = 10;

Reply via email to