This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b14a5378ad GH-43694: [C++] Add `Executor *` Option to
`arrow::dataset::ScanOptions` (#43698)
b14a5378ad is described below
commit b14a5378ad7b5cbcad637d2839298dbf4711e29c
Author: Srinivas Lade <[email protected]>
AuthorDate: Thu Sep 11 08:48:06 2025 -0700
GH-43694: [C++] Add `Executor *` Option to `arrow::dataset::ScanOptions`
(#43698)
### Rationale for this change
(See https://github.com/apache/arrow/issues/43694)
### What changes are included in this PR?
Added the option `Executor *executor` to `arrow::dataset::ScanOptions` and
modified the scanner and sub-functions to either use the internally specified
thread pool or the default internal pool when necessary.
### Are these changes tested?
Added a Parquet scanner test that uses the new ExecContext using a separate
thread pool for each fragment.
### Are there any user-facing changes?
Yes, adds a new option. I'm not sure how to update the documentation though
* GitHub Issue: #43694
Lead-authored-by: Scott Routledge <[email protected]>
Co-authored-by: Srinivas Lade <[email protected]>
Co-authored-by: scott-routledge2 <[email protected]>
Co-authored-by: Raúl Cumplido <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
cpp/src/arrow/dataset/file_parquet.cc | 11 ++-
cpp/src/arrow/dataset/file_parquet_test.cc | 146 +++++++++++++++++------------
cpp/src/arrow/dataset/scanner.cc | 21 +++--
cpp/src/arrow/dataset/scanner.h | 8 ++
cpp/src/arrow/util/thread_pool.h | 28 +++++-
5 files changed, 138 insertions(+), 76 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc
b/cpp/src/arrow/dataset/file_parquet.cc
index 1912da40fc..7ef6061870 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -36,6 +36,7 @@
#include "arrow/util/iterator.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/range.h"
+#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
@@ -642,10 +643,12 @@ Result<RecordBatchGenerator>
ParquetFileFormat::ScanBatchesAsync(
kParquetTypeName, options.get(), default_fragment_scan_options));
int batch_readahead = options->batch_readahead;
int64_t rows_to_readahead = batch_readahead * options->batch_size;
- ARROW_ASSIGN_OR_RAISE(auto generator,
- reader->GetRecordBatchGenerator(
- reader, row_groups, column_projection,
- ::arrow::internal::GetCpuThreadPool(),
rows_to_readahead));
+ // Use the executor from scan options if provided.
+ auto cpu_executor = options->cpu_executor ? options->cpu_executor
+ :
::arrow::internal::GetCpuThreadPool();
+ ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
+ reader, row_groups,
column_projection,
+ cpu_executor,
rows_to_readahead));
RecordBatchGenerator sliced =
SlicingGenerator(std::move(generator), options->batch_size);
if (batch_readahead == 0) {
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 95f00c195c..696bda1935 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -17,6 +17,7 @@
#include "arrow/dataset/file_parquet.h"
+#include <functional>
#include <memory>
#include <thread>
#include <utility>
@@ -25,6 +26,7 @@
#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/parquet_encryption_config.h"
+#include "arrow/dataset/scanner.h"
#include "arrow/dataset/test_util_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
@@ -133,6 +135,29 @@ class ParquetFormatHelper {
}
};
+class DelayedBufferReader : public ::arrow::io::BufferReader {
+ public:
+ explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
+ : ::arrow::io::BufferReader(buffer) {}
+
+ ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
+ const ::arrow::io::IOContext& io_context, int64_t position,
+ int64_t nbytes) override {
+ read_async_count.fetch_add(1);
+ auto self =
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
+ return DeferNotOk(::arrow::io::internal::SubmitIO(
+ io_context, [self, position, nbytes]() ->
Result<std::shared_ptr<Buffer>> {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ return self->DoReadAt(position, nbytes);
+ }));
+ }
+
+ std::atomic<int> read_async_count{0};
+};
+
+using CustomizeScanOptionsWithThreadPool =
+ std::function<void(ScanOptions&, arrow::internal::ThreadPool*)>;
+
class TestParquetFileFormat : public
FileFormatFixtureMixin<ParquetFormatHelper> {
public:
RecordBatchIterator Batches(Fragment* fragment) {
@@ -183,6 +208,51 @@ class TestParquetFileFormat : public
FileFormatFixtureMixin<ParquetFormatHelper>
EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
}
}
+
+ void TestMultithreadedRegression(CustomizeScanOptionsWithThreadPool
customizer) {
+ auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}),
10000, 100);
+ ASSERT_OK_AND_ASSIGN(auto buffer,
ParquetFormatHelper::Write(reader.get()));
+
+ std::vector<Future<>> completes;
+ std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
+
+ for (int idx = 0; idx < 2; ++idx) {
+ auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
+ auto source = std::make_shared<FileSource>(buffer_reader,
buffer->size());
+ auto fragment = MakeFragment(*source);
+ std::shared_ptr<Scanner> scanner;
+
+ {
+ auto options = std::make_shared<ScanOptions>();
+ ASSERT_OK_AND_ASSIGN(auto thread_pool,
arrow::internal::ThreadPool::Make(1));
+ pools.emplace_back(thread_pool);
+ customizer(*options, pools.back().get());
+ auto fragment_scan_options =
std::make_shared<ParquetFragmentScanOptions>();
+ fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
+
+ options->fragment_scan_options = fragment_scan_options;
+ ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment,
options);
+
+ ASSERT_OK(builder.UseThreads(true));
+ ASSERT_OK(builder.BatchSize(10000));
+ ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
+ }
+
+ ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
+ [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
+ // Random ReadAsync calls, generate some futures to make the state
machine
+ // more complex.
+ for (int yy = 0; yy < 16; yy++) {
+ completes.emplace_back(
+ buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
+ }
+ scanner = nullptr;
+ }
+
+ for (auto& f : completes) {
+ f.Wait();
+ }
+ }
};
TEST_F(TestParquetFileFormat, InspectFailureWithRelevantError) {
@@ -904,73 +974,25 @@ TEST(TestParquetStatistics, NoNullCount) {
}
}
-class DelayedBufferReader : public ::arrow::io::BufferReader {
- public:
- explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
- : ::arrow::io::BufferReader(buffer) {}
-
- ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
- const ::arrow::io::IOContext& io_context, int64_t position,
- int64_t nbytes) override {
- read_async_count.fetch_add(1);
- auto self =
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
- return DeferNotOk(::arrow::io::internal::SubmitIO(
- io_context, [self, position, nbytes]() ->
Result<std::shared_ptr<Buffer>> {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- return self->DoReadAt(position, nbytes);
- }));
- }
-
- std::atomic<int> read_async_count{0};
-};
-
TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
// GH-38438: This test is similar to MultithreadedScan, but it try to use
self
// designed Executor and DelayedBufferReader to mock async execution to make
// the state machine more complex.
- auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}),
10000, 100);
-
- ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
-
- std::vector<Future<>> completes;
- std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
-
- for (int idx = 0; idx < 2; ++idx) {
- auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
- auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
- auto fragment = MakeFragment(*source);
- std::shared_ptr<Scanner> scanner;
-
- {
- auto options = std::make_shared<ScanOptions>();
- ASSERT_OK_AND_ASSIGN(auto thread_pool,
arrow::internal::ThreadPool::Make(1));
- pools.emplace_back(thread_pool);
- options->io_context =
- ::arrow::io::IOContext(::arrow::default_memory_pool(),
pools.back().get());
- auto fragment_scan_options =
std::make_shared<ParquetFragmentScanOptions>();
- fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
-
- options->fragment_scan_options = fragment_scan_options;
- ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment,
options);
-
- ASSERT_OK(builder.UseThreads(true));
- ASSERT_OK(builder.BatchSize(10000));
- ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
- }
-
- ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
- [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
- // Random ReadAsync calls, generate some futures to make the state machine
- // more complex.
- for (int yy = 0; yy < 16; yy++) {
-
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0,
1001));
- }
- scanner = nullptr;
- }
+ CustomizeScanOptionsWithThreadPool customize_io_context =
+ [](ScanOptions& options, arrow::internal::ThreadPool* pool) {
+ options.io_context =
::arrow::io::IOContext(::arrow::default_memory_pool(), pool);
+ };
+ TestMultithreadedRegression(customize_io_context);
+}
- for (auto& f : completes) {
- f.Wait();
- }
+TEST_F(TestParquetFileFormat, MultithreadedComputeRegression) {
+ // GH-43694: Test similar situation as MultithreadedScanRegression but with
+ // the customized CPU executor instead
+ CustomizeScanOptionsWithThreadPool customize_cpu_executor =
+ [](ScanOptions& options, arrow::internal::ThreadPool* pool) {
+ options.cpu_executor = pool;
+ };
+ TestMultithreadedRegression(customize_cpu_executor);
}
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index a8c8c6bde6..222e1323d4 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -360,8 +360,9 @@ class OneShotFragment : public Fragment {
ARROW_ASSIGN_OR_RAISE(
auto background_gen,
MakeBackgroundGenerator(std::move(batch_it_),
options->io_context.executor()));
- return MakeTransferredGenerator(std::move(background_gen),
- ::arrow::internal::GetCpuThreadPool());
+ auto cpu_executor = options->cpu_executor ? options->cpu_executor
+ :
::arrow::internal::GetCpuThreadPool();
+ return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
}
std::string type_name() const override { return "one-shot"; }
@@ -387,7 +388,7 @@ Result<TaggedRecordBatchIterator>
AsyncScanner::ScanBatches() {
[this](::arrow::internal::Executor* executor) {
return ScanBatchesAsync(executor);
},
- scan_options_->use_threads);
+ scan_options_->use_threads, scan_options_->cpu_executor);
}
Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
@@ -395,7 +396,7 @@ Result<EnumeratedRecordBatchIterator>
AsyncScanner::ScanBatchesUnordered() {
[this](::arrow::internal::Executor* executor) {
return ScanBatchesUnorderedAsync(executor);
},
- scan_options_->use_threads);
+ scan_options_->use_threads, scan_options_->cpu_executor);
}
Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
@@ -405,7 +406,9 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
}
Result<EnumeratedRecordBatchGenerator>
AsyncScanner::ScanBatchesUnorderedAsync() {
- return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
+ return ScanBatchesUnorderedAsync(scan_options_->cpu_executor
+ ? scan_options_->cpu_executor
+ : ::arrow::internal::GetCpuThreadPool(),
/*sequence_fragments=*/false);
}
@@ -606,7 +609,9 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t
num_rows) {
}
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
- return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
+ return ScanBatchesAsync(scan_options_->cpu_executor
+ ? scan_options_->cpu_executor
+ : ::arrow::internal::GetCpuThreadPool());
}
Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
@@ -783,7 +788,9 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor*
executor) {
}
Future<int64_t> AsyncScanner::CountRowsAsync() {
- return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
+ return CountRowsAsync(scan_options_->cpu_executor
+ ? scan_options_->cpu_executor
+ : ::arrow::internal::GetCpuThreadPool());
}
Result<int64_t> AsyncScanner::CountRows() {
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 50310577f1..7885b132cc 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -35,6 +35,7 @@
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/iterator.h"
+#include "arrow/util/thread_pool.h"
#include "arrow/util/type_fwd.h"
namespace arrow {
@@ -104,6 +105,13 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Note: The IOContext executor will be ignored if use_threads is set to
false
io::IOContext io_context;
+ /// Executor for any CPU tasks
+ ///
+ /// If null, the global CPU executor will be used
+ ///
+ /// Note: The Executor will be ignored if use_threads is set to false
+ arrow::internal::Executor* cpu_executor = NULLPTR;
+
/// If true the scanner will scan in parallel
///
/// Note: If true, this will use threads from both the cpu_executor and the
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index 2e80f6e544..201b8cef79 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -593,9 +593,11 @@ typename Fut::SyncType
RunSynchronously(FnOnce<Fut(Executor*)> get_future,
}
/// \brief Potentially iterate an async generator serially (if use_threads is
false)
+/// using a potentially custom Executor
/// \see IterateGenerator
///
-/// If `use_threads` is true, the global CPU executor will be used. Each call
to
+/// If `use_threads` is true, the custom executor or, if null,
+/// the global CPU executor will be used. Each call to
/// the iterator will simply wait until the next item is available. Tasks
may run in
/// the background between calls.
///
@@ -605,9 +607,11 @@ typename Fut::SyncType
RunSynchronously(FnOnce<Fut(Executor*)> get_future,
/// calls.
template <typename T>
Iterator<T> IterateSynchronously(
- FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool
use_threads) {
+ FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool
use_threads,
+ Executor* executor) {
if (use_threads) {
- auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
+ auto used_executor = executor != NULLPTR ? executor : GetCpuThreadPool();
+ auto maybe_gen = std::move(get_gen)(used_executor);
if (!maybe_gen.ok()) {
return MakeErrorIterator<T>(maybe_gen.status());
}
@@ -617,5 +621,23 @@ Iterator<T> IterateSynchronously(
}
}
+/// \brief Potentially iterate an async generator serially (if use_threads is
false)
+/// using the default CPU thread pool
+/// \see IterateGenerator
+///
+/// If `use_threads` is true, the global CPU executor will be used. Each call
to
+/// the iterator will simply wait until the next item is available. Tasks
may run in
+/// the background between calls.
+///
+/// If `use_threads` is false, the calling thread only will be used. Each
call to
+/// the iterator will use the calling thread to do enough work to generate
one item.
+/// Tasks will be left in a queue until the next call and no work will be
done between
+/// calls.
+template <typename T>
+Iterator<T> IterateSynchronously(
+ FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool
use_threads) {
+ return IterateSynchronously(std::move(get_gen), use_threads, NULLPTR);
+}
+
} // namespace internal
} // namespace arrow