This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bd31f37ae GH-35125: [C++][Acero] Add a self-defined io-executor in 
QueryOptions (#35464)
6bd31f37ae is described below

commit 6bd31f37ae66bd35594b077cb2f830be57e08acd
Author: mwish <[email protected]>
AuthorDate: Thu May 25 14:13:49 2023 +0800

    GH-35125: [C++][Acero] Add a self-defined io-executor in QueryOptions 
(#35464)
    
    
    
    ### Rationale for this change
    
    Support a custom io-executor in Acero QueryOptions.
    
    ### What changes are included in this PR?
    
    Allow user to define a customed io-executor.
    
    ### Are these changes tested?
    
    Currently not
    
    ### Are there any user-facing changes?
    
    User can specify an io-executor
    
    * Closes: #35125
    
    Authored-by: mwish <[email protected]>
    Signed-off-by: Weston Pace <[email protected]>
---
 cpp/examples/arrow/dataset_parquet_scan_example.cc        |  2 +-
 .../arrow/execution_plan_documentation_examples.cc        |  2 +-
 cpp/src/arrow/acero/exec_plan.cc                          |  2 +-
 cpp/src/arrow/acero/exec_plan.h                           | 15 +++++++++++----
 cpp/src/arrow/acero/query_context.cc                      | 13 +++++++++++--
 cpp/src/arrow/compute/registry.h                          |  2 +-
 6 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/cpp/examples/arrow/dataset_parquet_scan_example.cc 
b/cpp/examples/arrow/dataset_parquet_scan_example.cc
index 2a2a31245d..29b1060db4 100644
--- a/cpp/examples/arrow/dataset_parquet_scan_example.cc
+++ b/cpp/examples/arrow/dataset_parquet_scan_example.cc
@@ -32,7 +32,7 @@
  * \brief Run Example
  *
  * Make sure there is a parquet dataset with the column_names
- * mentioned in the Congiguration struct.
+ * mentioned in the Configuration struct.
  *
  * Example run:
  * ./dataset_parquet_scan_example file:///<some-path>/data.parquet
diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc 
b/cpp/examples/arrow/execution_plan_documentation_examples.cc
index afdbb112c1..00a23be293 100644
--- a/cpp/examples/arrow/execution_plan_documentation_examples.cc
+++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc
@@ -299,7 +299,7 @@ arrow::Status ScanSinkExample() {
 /// in an execution plan. This includes source node using pregenerated
 /// data and collecting it into a table.
 ///
-/// This sort of custom souce is often not needed.  In most cases you can
+/// This sort of custom source is often not needed.  In most cases you can
 /// use a scan (for a dataset source) or a source like table_source, 
array_vector_source,
 /// exec_batch_source, or record_batch_source (for in-memory data)
 arrow::Status SourceSinkExample() {
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 432a859576..2fe8c484e4 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -105,7 +105,7 @@ struct ExecPlanImpl : public ExecPlan {
     if (query_context_.exec_context()->executor() == nullptr) {
       finished_.MarkFinished(Status::Invalid(
           "An exec plan must have an executor for CPU tasks.  To run without 
threads use "
-          "a SerialExeuctor (the arrow::compute::DeclarationTo... methods 
should take "
+          "a SerialExecutor (the arrow::compute::DeclarationTo... methods 
should take "
           "care of this for you and are an easier way to execute an 
ExecPlan.)"));
       return;
     }
diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h
index 45b30d973f..424d9107d2 100644
--- a/cpp/src/arrow/acero/exec_plan.h
+++ b/cpp/src/arrow/acero/exec_plan.h
@@ -177,9 +177,9 @@ class ARROW_ACERO_EXPORT ExecNode {
   /// non-deterministic.  For example, a hash-join has no predictable output 
order.
   ///
   /// If the ordering is Ordering::Implicit then there is a meaningful order 
but that
-  /// odering is not represented by any column in the data.  The most common 
case for this
-  /// is when reading data from an in-memory table.  The data has an implicit 
"row order"
-  /// which is not neccesarily represented in the data set.
+  /// ordering is not represented by any column in the data.  The most common 
case for
+  /// this is when reading data from an in-memory table.  The data has an 
implicit "row
+  /// order" which is not necessarily represented in the data set.
   ///
   /// A filter or project node will not modify the ordering.  Nothing needs to 
be done
   /// other than ensure the index assigned to output batches is the same as the
@@ -516,7 +516,7 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// otherwise.
   ///
   /// If explicitly set to true then plan execution will fail if there is no
-  /// meaningful ordering.  This can be useful to valdiate a query that should
+  /// meaningful ordering.  This can be useful to validate a query that should
   /// be emitting ordered results.
   ///
   /// If explicitly set to false then batches will be emit immediately even if 
there
@@ -540,6 +540,13 @@ struct ARROW_ACERO_EXPORT QueryOptions {
   /// the `use_threads` option.
   ::arrow::internal::Executor* custom_cpu_executor = NULLPTR;
 
+  /// \brief custom executor to use for IO work
+  ///
+  /// Must be null or remain valid for the duration of the plan.  If this is 
null then
+  /// the global io thread pool will be chosen whose behavior will be 
controlled by
+  /// the "ARROW_IO_THREADS" environment.
+  ::arrow::internal::Executor* custom_io_executor = NULLPTR;
+
   /// \brief a memory pool to use for allocations
   ///
   /// Must remain valid for the duration of the plan.
diff --git a/cpp/src/arrow/acero/query_context.cc 
b/cpp/src/arrow/acero/query_context.cc
index d1a9a7774e..9f838508fc 100644
--- a/cpp/src/arrow/acero/query_context.cc
+++ b/cpp/src/arrow/acero/query_context.cc
@@ -23,10 +23,19 @@ namespace arrow {
 using arrow::internal::CpuInfo;
 namespace acero {
 
+namespace {
+io::IOContext GetIoContext(const QueryOptions& opts, const ExecContext& 
exec_context) {
+  if (opts.custom_io_executor == nullptr) {
+    return io::IOContext(exec_context.memory_pool());
+  }
+  return io::IOContext(exec_context.memory_pool(), opts.custom_io_executor);
+}
+}  // namespace
+
 QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context)
-    : options_(opts),
+    : options_(std::move(opts)),
       exec_context_(exec_context),
-      io_context_(exec_context_.memory_pool()) {}
+      io_context_(GetIoContext(options_, exec_context_)) {}
 
 const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance(); 
}
 int64_t QueryContext::hardware_flags() const { return 
cpu_info()->hardware_flags(); }
diff --git a/cpp/src/arrow/compute/registry.h b/cpp/src/arrow/compute/registry.h
index a7eb4bcf34..afd9f2007b 100644
--- a/cpp/src/arrow/compute/registry.h
+++ b/cpp/src/arrow/compute/registry.h
@@ -55,7 +55,7 @@ class ARROW_EXPORT FunctionRegistry {
   /// \brief Construct a new nested registry with the given parent.
   ///
   /// Most users only need to use the global registry. The returned registry 
never changes
-  /// its parent, even when an operation allows overwritting.
+  /// its parent, even when an operation allows overwriting.
   static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
 
   /// \brief Check whether a new function can be added to the registry.

Reply via email to