kou commented on code in PR #38885:
URL: https://github.com/apache/arrow/pull/38885#discussion_r1411552495


##########
cpp/src/arrow/dataset/dataset_writer.cc:
##########
@@ -621,11 +621,21 @@ class DatasetWriter::DatasetWriterImpl {
         backpressure = writer_state_.open_files_throttle.Acquire(1);
         if (!backpressure.is_finished()) {
           
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
+          
writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());
           RETURN_NOT_OK(TryCloseLargestFile());
           break;
         }
       }
-      RETURN_NOT_OK(dir_queue->StartWrite(next_chunk));
+      auto s = dir_queue->StartWrite(next_chunk);
+      if (!s.ok()) {
+        // If `StartWrite` succeeded, it will Release the
+        // `rows_in_flight_throttle` when the write task is finished.
+        //
+        // `open_files_throttle` will be handed by 
`DatasetWriterDirectoryQueue`
+        // so we don't need to release it here.
+        writer_state_.rows_in_flight_throttle.Release(next_chunk->num_rows());

Review Comment:
   It seems that the added test is passed without this.
   Is it difficult to write a test for this change?



##########
cpp/src/arrow/dataset/dataset_writer_test.cc:
##########
@@ -277,6 +277,37 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
                      {"testdir/chunk-3.arrow", 30, 5}});
 }
 
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite2) {
+  // GH-38884: This test is to make sure that the writer can handle
+  //  throttle resources in `WriteRecordBatch`.
+
+  // CalculateMaxRowsStaged will make at least 1 << 23 rows for
+  // `rows_in_flight` throttle. So we set the limit to 1 << 23.
+  constexpr auto FILE_SIZE_LIMIT = static_cast<uint64_t>(1 << 23);

Review Comment:
   See also: https://google.github.io/styleguide/cppguide.html#Constant_Names
   
   ```suggestion
     constexpr auto kFileSizeLimit = static_cast<uint64_t>(1 << 23);
   ```



##########
cpp/src/arrow/dataset/dataset_writer_test.cc:
##########
@@ -277,6 +277,37 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
                      {"testdir/chunk-3.arrow", 30, 5}});
 }
 
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite2) {

Review Comment:
   Can we use more meaningful test name? `MaxRowsOneWriteBackprresure`?



##########
cpp/src/arrow/dataset/dataset_writer_test.cc:
##########
@@ -277,6 +277,37 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
                      {"testdir/chunk-3.arrow", 30, 5}});
 }
 
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite2) {
+  // GH-38884: This test is to make sure that the writer can handle
+  //  throttle resources in `WriteRecordBatch`.
+
+  // CalculateMaxRowsStaged will make at least 1 << 23 rows for
+  // `rows_in_flight` throttle. So we set the limit to 1 << 23.
+  constexpr auto FILE_SIZE_LIMIT = static_cast<uint64_t>(1 << 23);

Review Comment:
   Hmm. This test takes 8 secs on my local machine...
   How about making `CaluculateMaxRowsStaged` customizable something like the 
following and using small value for this test?
   
   ```diff
   diff --git a/cpp/src/arrow/dataset/dataset_writer.cc 
b/cpp/src/arrow/dataset/dataset_writer.cc
   index ae9fb3648..7c5a5befd 100644
   --- a/cpp/src/arrow/dataset/dataset_writer.cc
   +++ b/cpp/src/arrow/dataset/dataset_writer.cc
   @@ -500,8 +500,8 @@ Status EnsureDestinationValid(const 
FileSystemDatasetWriteOptions& options) {
    // Rule of thumb for the max rows to stage.  It will grow with 
max_rows_queued until
    // max_rows_queued starts to get too large and then it caps out at 8 
million rows.
    // Feel free to replace with something more meaningful, this is just a 
random heuristic.
   -uint64_t CalculateMaxRowsStaged(uint64_t max_rows_queued) {
   -  return std::min(static_cast<uint64_t>(1 << 23), max_rows_queued / 4);
   +uint64_t CalculateMaxRowsStaged(const FileSystemDatasetWriteOptions& 
options, uint64_t max_rows_queued) {
   +  return std::min(options.max_rows_staged, max_rows_queued / 4);
    }
    
    }  // namespace
   @@ -522,7 +522,7 @@ class DatasetWriter::DatasetWriterImpl {
                })),
            write_options_(std::move(write_options)),
            writer_state_(max_rows_queued, write_options_.max_open_files,
   -                      CalculateMaxRowsStaged(max_rows_queued)),
   +                      CalculateMaxRowsStaged(write_options_, 
max_rows_queued)),
            pause_callback_(std::move(pause_callback)),
            resume_callback_(std::move(resume_callback)) {}
    
   diff --git a/cpp/src/arrow/dataset/file_base.h 
b/cpp/src/arrow/dataset/file_base.h
   index 46fc8ebc4..62fda95e2 100644
   --- a/cpp/src/arrow/dataset/file_base.h
   +++ b/cpp/src/arrow/dataset/file_base.h
   @@ -445,6 +445,9 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
      /// This is mainly intended for filesystems that do not require 
directories such as S3.
      bool create_dir = true;
    
   +  /// ...
   +  uint64_t max_rows_staged = 1 << 23;
   +
      /// Callback to be invoked against all FileWriters before
      /// they are finalized with FileWriter::Finish().
      std::function<Status(FileWriter*)> writer_pre_finish = [](FileWriter*) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to