This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 951d92a7d5 GH-38438: [C++] Dataset: Trying to fix the async bug in 
Parquet dataset (#38466)
951d92a7d5 is described below

commit 951d92a7d5f9e09e5a2f603faf3ff11b92328f11
Author: mwish <[email protected]>
AuthorDate: Sat Nov 18 03:46:02 2023 +0800

    GH-38438: [C++] Dataset: Trying to fix the async bug in Parquet dataset 
(#38466)
    
    
    
    ### Rationale for this change
    
    Origin mentioned https://github.com/apache/arrow/issues/38438
    
    1. When PreBuffer is default enabled, the code in 
`RowGroupGenerator::FetchNext` would switch to async mode. This make the state 
handling more complex
    2. In `RowGroupGenerator::FetchNext`, `[this]` is captured without 
`shared_from_this`. This is not bad, however, `this->executor_` may point to a 
invalid address if this dtor.
    
    This patch also fixes a lifetime issue I founded in CSV handling.
    
    ### What changes are included in this PR?
    
    1. Fix handling in `cpp/src/parquet/arrow/reader.cc` as I talked above
    2. Fix a lifetime problem in CSV
    
    ### Are these changes tested?
    
    I test it locality. But don't know how to write unittest here. Fell free to 
help.
    
    ### Are there any user-facing changes?
    
    Bugfix
    
    * Closes: #38438
    
    Authored-by: mwish <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/csv/reader.cc                | 21 ++++-----
 cpp/src/arrow/dataset/file_parquet_test.cc | 69 ++++++++++++++++++++++++++++++
 cpp/src/parquet/arrow/reader.cc            |  6 +--
 3 files changed, 83 insertions(+), 13 deletions(-)

diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index bf703b6c6b..30fc0bc6ac 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -1113,16 +1113,17 @@ class AsyncThreadedTableReader
   Future<std::shared_ptr<Buffer>> ProcessFirstBuffer() {
     // First block
     auto first_buffer_future = buffer_generator_();
-    return first_buffer_future.Then([this](const std::shared_ptr<Buffer>& 
first_buffer)
-                                        -> Result<std::shared_ptr<Buffer>> {
-      if (first_buffer == nullptr) {
-        return Status::Invalid("Empty CSV file");
-      }
-      std::shared_ptr<Buffer> first_buffer_processed;
-      RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed));
-      RETURN_NOT_OK(MakeColumnBuilders());
-      return first_buffer_processed;
-    });
+    return first_buffer_future.Then(
+        [self = shared_from_this()](const std::shared_ptr<Buffer>& 
first_buffer)
+            -> Result<std::shared_ptr<Buffer>> {
+          if (first_buffer == nullptr) {
+            return Status::Invalid("Empty CSV file");
+          }
+          std::shared_ptr<Buffer> first_buffer_processed;
+          RETURN_NOT_OK(self->ProcessHeader(first_buffer, 
&first_buffer_processed));
+          RETURN_NOT_OK(self->MakeColumnBuilders());
+          return first_buffer_processed;
+        });
   }
 
   Executor* cpu_executor_;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc 
b/cpp/src/arrow/dataset/file_parquet_test.cc
index c22cf33eb3..84d4342a25 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -834,5 +834,74 @@ TEST(TestParquetStatistics, NullMax) {
   EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
 }
 
+class DelayedBufferReader : public ::arrow::io::BufferReader {
+ public:
+  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
+      : ::arrow::io::BufferReader(buffer) {}
+
+  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
+      const ::arrow::io::IOContext& io_context, int64_t position,
+      int64_t nbytes) override {
+    read_async_count.fetch_add(1);
+    auto self = 
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
+    return DeferNotOk(::arrow::io::internal::SubmitIO(
+        io_context, [self, position, nbytes]() -> 
Result<std::shared_ptr<Buffer>> {
+          std::this_thread::sleep_for(std::chrono::seconds(1));
+          return self->DoReadAt(position, nbytes);
+        }));
+  }
+
+  std::atomic<int> read_async_count{0};
+};
+
+TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
+  // GH-38438: This test is similar to MultithreadedScan, but it try to use 
self
+  // designed Executor and DelayedBufferReader to mock async execution to make
+  // the state machine more complex.
+  auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 
10000, 100);
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
+
+  std::vector<Future<>> completes;
+  std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
+
+  for (int idx = 0; idx < 2; ++idx) {
+    auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
+    auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
+    auto fragment = MakeFragment(*source);
+    std::shared_ptr<Scanner> scanner;
+
+    {
+      auto options = std::make_shared<ScanOptions>();
+      ASSERT_OK_AND_ASSIGN(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
+      pools.emplace_back(thread_pool);
+      options->io_context =
+          ::arrow::io::IOContext(::arrow::default_memory_pool(), 
pools.back().get());
+      auto fragment_scan_options = 
std::make_shared<ParquetFragmentScanOptions>();
+      fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
+
+      options->fragment_scan_options = fragment_scan_options;
+      ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, 
options);
+
+      ASSERT_OK(builder.UseThreads(true));
+      ASSERT_OK(builder.BatchSize(10000));
+      ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
+    }
+
+    ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
+    [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
+    // Random ReadAsync calls, generate some futures to make the state machine
+    // more complex.
+    for (int yy = 0; yy < 16; yy++) {
+      
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 
1001));
+    }
+    scanner = nullptr;
+  }
+
+  for (auto& f : completes) {
+    f.Wait();
+  }
+}
+
 }  // namespace dataset
 }  // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 99b8a9ccef..d6ad7c25bc 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1123,10 +1123,10 @@ class RowGroupGenerator {
       auto ready = reader->parquet_reader()->WhenBuffered({row_group}, 
column_indices);
       if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
       row_group_read =
-          ready.Then([this, reader, row_group,
+          ready.Then([cpu_executor = cpu_executor_, reader, row_group,
                       column_indices = std::move(
                           column_indices)]() -> 
::arrow::Future<RecordBatchGenerator> {
-            return ReadOneRowGroup(cpu_executor_, reader, row_group, 
column_indices);
+            return ReadOneRowGroup(cpu_executor, reader, row_group, 
column_indices);
           });
     }
     in_flight_reads_.push({std::move(row_group_read), num_rows});
@@ -1182,7 +1182,7 @@ 
FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
                                         int64_t rows_to_readahead) {
   RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
   if (rows_to_readahead < 0) {
-    return Status::Invalid("rows_to_readahead must be > 0");
+    return Status::Invalid("rows_to_readahead must be >= 0");
   }
   if (reader_properties_.pre_buffer()) {
     BEGIN_PARQUET_CATCH_EXCEPTIONS

Reply via email to