This is an automated email from the ASF dual-hosted git repository.

zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b14a5378ad GH-43694: [C++] Add `Executor *` Option to 
`arrow::dataset::ScanOptions` (#43698)
b14a5378ad is described below

commit b14a5378ad7b5cbcad637d2839298dbf4711e29c
Author: Srinivas Lade <[email protected]>
AuthorDate: Thu Sep 11 08:48:06 2025 -0700

    GH-43694: [C++] Add `Executor *` Option to `arrow::dataset::ScanOptions` 
(#43698)
    
    ### Rationale for this change
    
    (See https://github.com/apache/arrow/issues/43694)
    
    ### What changes are included in this PR?
    
    Added the option `Executor *executor` to `arrow::dataset::ScanOptions` and 
modified the scanner and sub-functions to either use the internally specified 
thread pool or the default internal pool when necessary.
    
    ### Are these changes tested?
    
    Added a Parquet scanner test that uses the new ExecContext using a separate 
thread pool for each fragment.
    
    ### Are there any user-facing changes?
    
    Yes, adds a new option. I'm not sure how to update the documentation though
    * GitHub Issue: #43694
    
    Lead-authored-by: Scott Routledge <[email protected]>
    Co-authored-by: Srinivas Lade <[email protected]>
    Co-authored-by: scott-routledge2 <[email protected]>
    Co-authored-by: Raúl Cumplido <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 cpp/src/arrow/dataset/file_parquet.cc      |  11 ++-
 cpp/src/arrow/dataset/file_parquet_test.cc | 146 +++++++++++++++++------------
 cpp/src/arrow/dataset/scanner.cc           |  21 +++--
 cpp/src/arrow/dataset/scanner.h            |   8 ++
 cpp/src/arrow/util/thread_pool.h           |  28 +++++-
 5 files changed, 138 insertions(+), 76 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index 1912da40fc..7ef6061870 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -36,6 +36,7 @@
 #include "arrow/util/iterator.h"
 #include "arrow/util/logging_internal.h"
 #include "arrow/util/range.h"
+#include "arrow/util/thread_pool.h"
 #include "arrow/util/tracing_internal.h"
 #include "parquet/arrow/reader.h"
 #include "parquet/arrow/schema.h"
@@ -642,10 +643,12 @@ Result<RecordBatchGenerator> 
ParquetFileFormat::ScanBatchesAsync(
             kParquetTypeName, options.get(), default_fragment_scan_options));
     int batch_readahead = options->batch_readahead;
     int64_t rows_to_readahead = batch_readahead * options->batch_size;
-    ARROW_ASSIGN_OR_RAISE(auto generator,
-                          reader->GetRecordBatchGenerator(
-                              reader, row_groups, column_projection,
-                              ::arrow::internal::GetCpuThreadPool(), 
rows_to_readahead));
+    // Use the executor from scan options if provided.
+    auto cpu_executor = options->cpu_executor ? options->cpu_executor
+                                              : 
::arrow::internal::GetCpuThreadPool();
+    ARROW_ASSIGN_OR_RAISE(auto generator, reader->GetRecordBatchGenerator(
+                                              reader, row_groups, 
column_projection,
+                                              cpu_executor, 
rows_to_readahead));
     RecordBatchGenerator sliced =
         SlicingGenerator(std::move(generator), options->batch_size);
     if (batch_readahead == 0) {
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc 
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 95f00c195c..696bda1935 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/dataset/file_parquet.h"
 
+#include <functional>
 #include <memory>
 #include <thread>
 #include <utility>
@@ -25,6 +26,7 @@
 #include "arrow/compute/api_scalar.h"
 #include "arrow/dataset/dataset_internal.h"
 #include "arrow/dataset/parquet_encryption_config.h"
+#include "arrow/dataset/scanner.h"
 #include "arrow/dataset/test_util_internal.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/io/memory.h"
@@ -133,6 +135,29 @@ class ParquetFormatHelper {
   }
 };
 
+class DelayedBufferReader : public ::arrow::io::BufferReader {
+ public:
+  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
+      : ::arrow::io::BufferReader(buffer) {}
+
+  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
+      const ::arrow::io::IOContext& io_context, int64_t position,
+      int64_t nbytes) override {
+    read_async_count.fetch_add(1);
+    auto self = 
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
+    return DeferNotOk(::arrow::io::internal::SubmitIO(
+        io_context, [self, position, nbytes]() -> 
Result<std::shared_ptr<Buffer>> {
+          std::this_thread::sleep_for(std::chrono::seconds(1));
+          return self->DoReadAt(position, nbytes);
+        }));
+  }
+
+  std::atomic<int> read_async_count{0};
+};
+
+using CustomizeScanOptionsWithThreadPool =
+    std::function<void(ScanOptions&, arrow::internal::ThreadPool*)>;
+
 class TestParquetFileFormat : public 
FileFormatFixtureMixin<ParquetFormatHelper> {
  public:
   RecordBatchIterator Batches(Fragment* fragment) {
@@ -183,6 +208,51 @@ class TestParquetFileFormat : public 
FileFormatFixtureMixin<ParquetFormatHelper>
       EXPECT_EQ(SingleBatch(parquet_fragment.get())->num_rows(), expected + 1);
     }
   }
+
+  void TestMultithreadedRegression(CustomizeScanOptionsWithThreadPool 
customizer) {
+    auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 
10000, 100);
+    ASSERT_OK_AND_ASSIGN(auto buffer, 
ParquetFormatHelper::Write(reader.get()));
+
+    std::vector<Future<>> completes;
+    std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
+
+    for (int idx = 0; idx < 2; ++idx) {
+      auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
+      auto source = std::make_shared<FileSource>(buffer_reader, 
buffer->size());
+      auto fragment = MakeFragment(*source);
+      std::shared_ptr<Scanner> scanner;
+
+      {
+        auto options = std::make_shared<ScanOptions>();
+        ASSERT_OK_AND_ASSIGN(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
+        pools.emplace_back(thread_pool);
+        customizer(*options, pools.back().get());
+        auto fragment_scan_options = 
std::make_shared<ParquetFragmentScanOptions>();
+        fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
+
+        options->fragment_scan_options = fragment_scan_options;
+        ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, 
options);
+
+        ASSERT_OK(builder.UseThreads(true));
+        ASSERT_OK(builder.BatchSize(10000));
+        ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
+      }
+
+      ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
+      [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
+      // Random ReadAsync calls, generate some futures to make the state 
machine
+      // more complex.
+      for (int yy = 0; yy < 16; yy++) {
+        completes.emplace_back(
+            buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 1001));
+      }
+      scanner = nullptr;
+    }
+
+    for (auto& f : completes) {
+      f.Wait();
+    }
+  }
 };
 
 TEST_F(TestParquetFileFormat, InspectFailureWithRelevantError) {
@@ -904,73 +974,25 @@ TEST(TestParquetStatistics, NoNullCount) {
   }
 }
 
-class DelayedBufferReader : public ::arrow::io::BufferReader {
- public:
-  explicit DelayedBufferReader(const std::shared_ptr<::arrow::Buffer>& buffer)
-      : ::arrow::io::BufferReader(buffer) {}
-
-  ::arrow::Future<std::shared_ptr<Buffer>> ReadAsync(
-      const ::arrow::io::IOContext& io_context, int64_t position,
-      int64_t nbytes) override {
-    read_async_count.fetch_add(1);
-    auto self = 
std::dynamic_pointer_cast<DelayedBufferReader>(shared_from_this());
-    return DeferNotOk(::arrow::io::internal::SubmitIO(
-        io_context, [self, position, nbytes]() -> 
Result<std::shared_ptr<Buffer>> {
-          std::this_thread::sleep_for(std::chrono::seconds(1));
-          return self->DoReadAt(position, nbytes);
-        }));
-  }
-
-  std::atomic<int> read_async_count{0};
-};
-
 TEST_F(TestParquetFileFormat, MultithreadedScanRegression) {
   // GH-38438: This test is similar to MultithreadedScan, but it try to use 
self
   // designed Executor and DelayedBufferReader to mock async execution to make
   // the state machine more complex.
-  auto reader = MakeGeneratedRecordBatch(schema({field("utf8", utf8())}), 
10000, 100);
-
-  ASSERT_OK_AND_ASSIGN(auto buffer, ParquetFormatHelper::Write(reader.get()));
-
-  std::vector<Future<>> completes;
-  std::vector<std::shared_ptr<arrow::internal::ThreadPool>> pools;
-
-  for (int idx = 0; idx < 2; ++idx) {
-    auto buffer_reader = std::make_shared<DelayedBufferReader>(buffer);
-    auto source = std::make_shared<FileSource>(buffer_reader, buffer->size());
-    auto fragment = MakeFragment(*source);
-    std::shared_ptr<Scanner> scanner;
-
-    {
-      auto options = std::make_shared<ScanOptions>();
-      ASSERT_OK_AND_ASSIGN(auto thread_pool, 
arrow::internal::ThreadPool::Make(1));
-      pools.emplace_back(thread_pool);
-      options->io_context =
-          ::arrow::io::IOContext(::arrow::default_memory_pool(), 
pools.back().get());
-      auto fragment_scan_options = 
std::make_shared<ParquetFragmentScanOptions>();
-      fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
-
-      options->fragment_scan_options = fragment_scan_options;
-      ScannerBuilder builder(ArithmeticDatasetFixture::schema(), fragment, 
options);
-
-      ASSERT_OK(builder.UseThreads(true));
-      ASSERT_OK(builder.BatchSize(10000));
-      ASSERT_OK_AND_ASSIGN(scanner, builder.Finish());
-    }
-
-    ASSERT_OK_AND_ASSIGN(auto batch, scanner->Head(10000));
-    [[maybe_unused]] auto fut = scanner->ScanBatchesUnorderedAsync();
-    // Random ReadAsync calls, generate some futures to make the state machine
-    // more complex.
-    for (int yy = 0; yy < 16; yy++) {
-      
completes.emplace_back(buffer_reader->ReadAsync(::arrow::io::IOContext(), 0, 
1001));
-    }
-    scanner = nullptr;
-  }
+  CustomizeScanOptionsWithThreadPool customize_io_context =
+      [](ScanOptions& options, arrow::internal::ThreadPool* pool) {
+        options.io_context = 
::arrow::io::IOContext(::arrow::default_memory_pool(), pool);
+      };
+  TestMultithreadedRegression(customize_io_context);
+}
 
-  for (auto& f : completes) {
-    f.Wait();
-  }
+TEST_F(TestParquetFileFormat, MultithreadedComputeRegression) {
+  // GH-43694: Test similar situation as MultithreadedScanRegression but with
+  // the customized CPU executor instead
+  CustomizeScanOptionsWithThreadPool customize_cpu_executor =
+      [](ScanOptions& options, arrow::internal::ThreadPool* pool) {
+        options.cpu_executor = pool;
+      };
+  TestMultithreadedRegression(customize_cpu_executor);
 }
 
 }  // namespace dataset
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index a8c8c6bde6..222e1323d4 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -360,8 +360,9 @@ class OneShotFragment : public Fragment {
     ARROW_ASSIGN_OR_RAISE(
         auto background_gen,
         MakeBackgroundGenerator(std::move(batch_it_), 
options->io_context.executor()));
-    return MakeTransferredGenerator(std::move(background_gen),
-                                    ::arrow::internal::GetCpuThreadPool());
+    auto cpu_executor = options->cpu_executor ? options->cpu_executor
+                                              : 
::arrow::internal::GetCpuThreadPool();
+    return MakeTransferredGenerator(std::move(background_gen), cpu_executor);
   }
   std::string type_name() const override { return "one-shot"; }
 
@@ -387,7 +388,7 @@ Result<TaggedRecordBatchIterator> 
AsyncScanner::ScanBatches() {
       [this](::arrow::internal::Executor* executor) {
         return ScanBatchesAsync(executor);
       },
-      scan_options_->use_threads);
+      scan_options_->use_threads, scan_options_->cpu_executor);
 }
 
 Result<EnumeratedRecordBatchIterator> AsyncScanner::ScanBatchesUnordered() {
@@ -395,7 +396,7 @@ Result<EnumeratedRecordBatchIterator> 
AsyncScanner::ScanBatchesUnordered() {
       [this](::arrow::internal::Executor* executor) {
         return ScanBatchesUnorderedAsync(executor);
       },
-      scan_options_->use_threads);
+      scan_options_->use_threads, scan_options_->cpu_executor);
 }
 
 Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
@@ -405,7 +406,9 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
 }
 
 Result<EnumeratedRecordBatchGenerator> 
AsyncScanner::ScanBatchesUnorderedAsync() {
-  return ScanBatchesUnorderedAsync(::arrow::internal::GetCpuThreadPool(),
+  return ScanBatchesUnorderedAsync(scan_options_->cpu_executor
+                                       ? scan_options_->cpu_executor
+                                       : ::arrow::internal::GetCpuThreadPool(),
                                    /*sequence_fragments=*/false);
 }
 
@@ -606,7 +609,9 @@ Result<std::shared_ptr<Table>> AsyncScanner::Head(int64_t 
num_rows) {
 }
 
 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
-  return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool());
+  return ScanBatchesAsync(scan_options_->cpu_executor
+                              ? scan_options_->cpu_executor
+                              : ::arrow::internal::GetCpuThreadPool());
 }
 
 Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync(
@@ -783,7 +788,9 @@ Future<int64_t> AsyncScanner::CountRowsAsync(Executor* 
executor) {
 }
 
 Future<int64_t> AsyncScanner::CountRowsAsync() {
-  return CountRowsAsync(::arrow::internal::GetCpuThreadPool());
+  return CountRowsAsync(scan_options_->cpu_executor
+                            ? scan_options_->cpu_executor
+                            : ::arrow::internal::GetCpuThreadPool());
 }
 
 Result<int64_t> AsyncScanner::CountRows() {
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 50310577f1..7885b132cc 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -35,6 +35,7 @@
 #include "arrow/type_fwd.h"
 #include "arrow/util/async_generator_fwd.h"
 #include "arrow/util/iterator.h"
+#include "arrow/util/thread_pool.h"
 #include "arrow/util/type_fwd.h"
 
 namespace arrow {
@@ -104,6 +105,13 @@ struct ARROW_DS_EXPORT ScanOptions {
   /// Note: The IOContext executor will be ignored if use_threads is set to 
false
   io::IOContext io_context;
 
+  /// Executor for any CPU tasks
+  ///
+  /// If null, the global CPU executor will be used
+  ///
+  /// Note: The Executor will be ignored if use_threads is set to false
+  arrow::internal::Executor* cpu_executor = NULLPTR;
+
   /// If true the scanner will scan in parallel
   ///
   /// Note: If true, this will use threads from both the cpu_executor and the
diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h
index 2e80f6e544..201b8cef79 100644
--- a/cpp/src/arrow/util/thread_pool.h
+++ b/cpp/src/arrow/util/thread_pool.h
@@ -593,9 +593,11 @@ typename Fut::SyncType 
RunSynchronously(FnOnce<Fut(Executor*)> get_future,
 }
 
 /// \brief Potentially iterate an async generator serially (if use_threads is 
false)
+///   using a potentially custom Executor
 /// \see IterateGenerator
 ///
-/// If `use_threads` is true, the global CPU executor will be used.  Each call 
to
+/// If `use_threads` is true, the custom executor or, if null,
+///   the global CPU executor will be used.  Each call to
 ///   the iterator will simply wait until the next item is available.  Tasks 
may run in
 ///   the background between calls.
 ///
@@ -605,9 +607,11 @@ typename Fut::SyncType 
RunSynchronously(FnOnce<Fut(Executor*)> get_future,
 ///   calls.
 template <typename T>
 Iterator<T> IterateSynchronously(
-    FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool 
use_threads) {
+    FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool 
use_threads,
+    Executor* executor) {
   if (use_threads) {
-    auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
+    auto used_executor = executor != NULLPTR ? executor : GetCpuThreadPool();
+    auto maybe_gen = std::move(get_gen)(used_executor);
     if (!maybe_gen.ok()) {
       return MakeErrorIterator<T>(maybe_gen.status());
     }
@@ -617,5 +621,23 @@ Iterator<T> IterateSynchronously(
   }
 }
 
+/// \brief Potentially iterate an async generator serially (if use_threads is 
false)
+///   using the default CPU thread pool
+/// \see IterateGenerator
+///
+/// If `use_threads` is true, the global CPU executor will be used.  Each call 
to
+///   the iterator will simply wait until the next item is available.  Tasks 
may run in
+///   the background between calls.
+///
+/// If `use_threads` is false, the calling thread only will be used.  Each 
call to
+///   the iterator will use the calling thread to do enough work to generate 
one item.
+///   Tasks will be left in a queue until the next call and no work will be 
done between
+///   calls.
+template <typename T>
+Iterator<T> IterateSynchronously(
+    FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool 
use_threads) {
+  return IterateSynchronously(std::move(get_gen), use_threads, NULLPTR);
+}
+
 }  // namespace internal
 }  // namespace arrow

Reply via email to