This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 951d92a7d5 GH-38438: [C++] Dataset: Trying to fix the async bug in
Parquet dataset (#38466)
951d92a7d5 is described below
commit 951d92a7d5f9e09e5a2f603faf3ff11b92328f11
Author: mwish <[email protected]>
AuthorDate: Sat Nov 18 03:46:02 2023 +0800
GH-38438: [C++] Dataset: Trying to fix the async bug in Parquet dataset
(#38466)
### Rationale for this change
Origin mentioned https://github.com/apache/arrow/issues/38438
1. When PreBuffer is default enabled, the code in
`RowGroupGenerator::FetchNext` would switch to async mode. This make the state
handling more complex
2. In `RowGroupGenerator::FetchNext`, `[this]` is captured without
`shared_from_this`. This is not bad, however, `this->executor_` may point to a
invalid address if this dtor.
This patch also fixes a lifetime issue I founded in CSV handling.
### What changes are included in this PR?
1. Fix handling in `cpp/src/parquet/arrow/reader.cc` as I talked above
2. Fix a lifetime problem in CSV
### Are these changes tested?
I test it locality. But don't know how to write unittest here. Fell free to
help.
### Are there any user-facing changes?
Bugfix
* Closes: #38438
Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/csv/reader.cc | 21 ++++-----
cpp/src/arrow/dataset/file_parquet_test.cc | 69 ++++++++++++++++++++++++++++++
cpp/src/parquet/arrow/reader.cc | 6 +--
3 files changed, 83 insertions(+), 13 deletions(-)
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index bf703b6c6b..30fc0bc6ac 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -1113,16 +1113,17 @@ class AsyncThreadedTableReader
Future<std::shared_ptr<Buffer>> ProcessFirstBuffer() {
// First block
auto first_buffer_future = buffer_generator_();
- return first_buffer_future.Then([this](const std::shared_ptr<Buffer>&
first_buffer)
- -> Result<std::shared_ptr<Buffer>> {
- if (first_buffer == nullptr) {
- return Status::Invalid("Empty CSV file");
- }
- std::shared_ptr<Buffer> first_buffer_processed;
- RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer_processed));
- RETURN_NOT_OK(MakeColumnBuilders());
- return first_buffer_processed;
- });
+ return first_buffer_future.Then(
+ [self = shared_from_this()](const std::shared_ptr<Buffer>&
first_buffer)
+ -> Result<std::shared_ptr<Buffer>> {
+ if (first_buffer == nullptr) {
+ return Status::Invalid("Empty CSV file");
+ }
+ std::shared_ptr<Buffer> first_buffer_processed;
+ RETURN_NOT_OK(self->ProcessHeader(first_buffer,
&first_buffer_processed));
+ RETURN_NOT_OK(self->MakeColumnBuilders());
+ return first_buffer_processed;
+ });
}
Executor* cpu_executor_;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc
b/cpp/src/arrow/dataset/file_parquet_test.cc
index c22cf33eb3..84d4342a25 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -834,5 +834,74 @@ TEST(TestParquetStatistics, NullMax) {
EXPECT_EQ(stat_expression->ToString(), "(x >= 1)");
}
+class DelayedBufferReader : public ::arrow::io::BufferReader {
+ public:
+ explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
+ : ::arrow::io::BufferReader(buffer) {}
+
+ ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
+ const ::arrow::io::IOContext& io_context, int64_t position,
+ int64_t nbytes) override {
+ read_async_count.fetch_add(1);
+ auto self =
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
+ return DeferNotOk(::arrow::io::internal::SubmitIO(
+ io_context, [self, position, nbytes]() ->
Result<std::shared_ptr<Buffer>> {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ return self->DoReadAt(position, nbytes);
+ }));
+ }
+
+ std::atomic<int> read_async_count{0};
+};
+
+TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
+ // GH-38438: This test is similar to MultithreadedScan, but it try to use
self
+ // designed Executor and DelayedBufferReader to mock async execution to make
+ // the state machine more complex.
+ auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}),
10000, 100);
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
+
+ std::vector<Future<>> completes;
+ std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
+
+ for (int idx = 0; idx < 2; ++idx) {
+ auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
+ auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
+ auto fragment = MakeFragment(*source);
+ std::shared_ptr<Scanner> scanner;
+
+ {
+ auto options = std::make_shared<ScanOptions>();
+ ASSERT_OK_AND_ASSIGN(auto thread_pool,
arrow::internal::ThreadPool::Make(1));
+ pools.emplace_back(thread_pool);
+ options->io_context =
+ ::arrow::io::IOContext(::arrow::default_memory_pool(),
pools.back().get());
+ auto fragment_scan_options =
std::make_shared<ParquetFragmentScanOptions>();
+ fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
+
+ options->fragment_scan_options = fragment_scan_options;
+ ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment,
options);
+
+ ASSERT_OK(builder.UseThreads(true));
+ ASSERT_OK(builder.BatchSize(10000));
+ ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
+ [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
+ // Random ReadAsync calls, generate some futures to make the state machine
+ // more complex.
+ for (int yy = 0; yy < 16; yy++) {
+
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0,
1001));
+ }
+ scanner = nullptr;
+ }
+
+ for (auto& f : completes) {
+ f.Wait();
+ }
+}
+
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 99b8a9ccef..d6ad7c25bc 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1123,10 +1123,10 @@ class RowGroupGenerator {
auto ready = reader->parquet_reader()->WhenBuffered({row_group},
column_indices);
if (cpu_executor_) ready = cpu_executor_->TransferAlways(ready);
row_group_read =
- ready.Then([this, reader, row_group,
+ ready.Then([cpu_executor = cpu_executor_, reader, row_group,
column_indices = std::move(
column_indices)]() ->
::arrow::Future<RecordBatchGenerator> {
- return ReadOneRowGroup(cpu_executor_, reader, row_group,
column_indices);
+ return ReadOneRowGroup(cpu_executor, reader, row_group,
column_indices);
});
}
in_flight_reads_.push({std::move(row_group_read), num_rows});
@@ -1182,7 +1182,7 @@
FileReaderImpl::GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
int64_t rows_to_readahead) {
RETURN_NOT_OK(BoundsCheck(row_group_indices, column_indices));
if (rows_to_readahead < 0) {
- return Status::Invalid("rows_to_readahead must be > 0");
+ return Status::Invalid("rows_to_readahead must be >= 0");
}
if (reader_properties_.pre_buffer()) {
BEGIN_PARQUET_CATCH_EXCEPTIONS