This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6bd31f37ae GH-35125: [C++][Acero] Add a self-defined io-executor in
QueryOptions (#35464)
6bd31f37ae is described below
commit 6bd31f37ae66bd35594b077cb2f830be57e08acd
Author: mwish <[email protected]>
AuthorDate: Thu May 25 14:13:49 2023 +0800
GH-35125: [C++][Acero] Add a self-defined io-executor in QueryOptions
(#35464)
### Rationale for this change
Support a custom io-executor in Acero QueryOptions.
### What changes are included in this PR?
Allow user to define a customed io-executor.
### Are these changes tested?
Currently not
### Are there any user-facing changes?
User can specify an io-executor
* Closes: #35125
Authored-by: mwish <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
---
cpp/examples/arrow/dataset_parquet_scan_example.cc | 2 +-
.../arrow/execution_plan_documentation_examples.cc | 2 +-
cpp/src/arrow/acero/exec_plan.cc | 2 +-
cpp/src/arrow/acero/exec_plan.h | 15 +++++++++++----
cpp/src/arrow/acero/query_context.cc | 13 +++++++++++--
cpp/src/arrow/compute/registry.h | 2 +-
6 files changed, 26 insertions(+), 10 deletions(-)
diff --git a/cpp/examples/arrow/dataset_parquet_scan_example.cc
b/cpp/examples/arrow/dataset_parquet_scan_example.cc
index 2a2a31245d..29b1060db4 100644
--- a/cpp/examples/arrow/dataset_parquet_scan_example.cc
+++ b/cpp/examples/arrow/dataset_parquet_scan_example.cc
@@ -32,7 +32,7 @@
* \brief Run Example
*
* Make sure there is a parquet dataset with the column_names
- * mentioned in the Congiguration struct.
+ * mentioned in the Configuration struct.
*
* Example run:
* ./dataset_parquet_scan_example file:///<some-path>/data.parquet
diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc
b/cpp/examples/arrow/execution_plan_documentation_examples.cc
index afdbb112c1..00a23be293 100644
--- a/cpp/examples/arrow/execution_plan_documentation_examples.cc
+++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc
@@ -299,7 +299,7 @@ arrow::Status ScanSinkExample() {
/// in an execution plan. This includes source node using pregenerated
/// data and collecting it into a table.
///
-/// This sort of custom souce is often not needed. In most cases you can
+/// This sort of custom source is often not needed. In most cases you can
/// use a scan (for a dataset source) or a source like table_source,
array_vector_source,
/// exec_batch_source, or record_batch_source (for in-memory data)
arrow::Status SourceSinkExample() {
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 432a859576..2fe8c484e4 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -105,7 +105,7 @@ struct ExecPlanImpl : public ExecPlan {
if (query_context_.exec_context()->executor() == nullptr) {
finished_.MarkFinished(Status::Invalid(
"An exec plan must have an executor for CPU tasks. To run without
threads use "
- "a SerialExeuctor (the arrow::compute::DeclarationTo... methods
should take "
+ "a SerialExecutor (the arrow::compute::DeclarationTo... methods
should take "
"care of this for you and are an easier way to execute an
ExecPlan.)"));
return;
}
diff --git a/cpp/src/arrow/acero/exec_plan.h b/cpp/src/arrow/acero/exec_plan.h
index 45b30d973f..424d9107d2 100644
--- a/cpp/src/arrow/acero/exec_plan.h
+++ b/cpp/src/arrow/acero/exec_plan.h
@@ -177,9 +177,9 @@ class ARROW_ACERO_EXPORT ExecNode {
/// non-deterministic. For example, a hash-join has no predictable output
order.
///
/// If the ordering is Ordering::Implicit then there is a meaningful order
but that
- /// odering is not represented by any column in the data. The most common
case for this
- /// is when reading data from an in-memory table. The data has an implicit
"row order"
- /// which is not neccesarily represented in the data set.
+ /// ordering is not represented by any column in the data. The most common
case for
+ /// this is when reading data from an in-memory table. The data has an
implicit "row
+ /// order" which is not necessarily represented in the data set.
///
/// A filter or project node will not modify the ordering. Nothing needs to
be done
/// other than ensure the index assigned to output batches is the same as the
@@ -516,7 +516,7 @@ struct ARROW_ACERO_EXPORT QueryOptions {
/// otherwise.
///
/// If explicitly set to true then plan execution will fail if there is no
- /// meaningful ordering. This can be useful to valdiate a query that should
+ /// meaningful ordering. This can be useful to validate a query that should
/// be emitting ordered results.
///
/// If explicitly set to false then batches will be emit immediately even if
there
@@ -540,6 +540,13 @@ struct ARROW_ACERO_EXPORT QueryOptions {
/// the `use_threads` option.
::arrow::internal::Executor* custom_cpu_executor = NULLPTR;
+ /// \brief custom executor to use for IO work
+ ///
+ /// Must be null or remain valid for the duration of the plan. If this is
null then
+ /// the global io thread pool will be chosen whose behavior will be
controlled by
+ /// the "ARROW_IO_THREADS" environment.
+ ::arrow::internal::Executor* custom_io_executor = NULLPTR;
+
/// \brief a memory pool to use for allocations
///
/// Must remain valid for the duration of the plan.
diff --git a/cpp/src/arrow/acero/query_context.cc
b/cpp/src/arrow/acero/query_context.cc
index d1a9a7774e..9f838508fc 100644
--- a/cpp/src/arrow/acero/query_context.cc
+++ b/cpp/src/arrow/acero/query_context.cc
@@ -23,10 +23,19 @@ namespace arrow {
using arrow::internal::CpuInfo;
namespace acero {
+namespace {
+io::IOContext GetIoContext(const QueryOptions& opts, const ExecContext&
exec_context) {
+ if (opts.custom_io_executor == nullptr) {
+ return io::IOContext(exec_context.memory_pool());
+ }
+ return io::IOContext(exec_context.memory_pool(), opts.custom_io_executor);
+}
+} // namespace
+
QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context)
- : options_(opts),
+ : options_(std::move(opts)),
exec_context_(exec_context),
- io_context_(exec_context_.memory_pool()) {}
+ io_context_(GetIoContext(options_, exec_context_)) {}
const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance();
}
int64_t QueryContext::hardware_flags() const { return
cpu_info()->hardware_flags(); }
diff --git a/cpp/src/arrow/compute/registry.h b/cpp/src/arrow/compute/registry.h
index a7eb4bcf34..afd9f2007b 100644
--- a/cpp/src/arrow/compute/registry.h
+++ b/cpp/src/arrow/compute/registry.h
@@ -55,7 +55,7 @@ class ARROW_EXPORT FunctionRegistry {
/// \brief Construct a new nested registry with the given parent.
///
/// Most users only need to use the global registry. The returned registry
never changes
- /// its parent, even when an operation allows overwritting.
+ /// its parent, even when an operation allows overwriting.
static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
/// \brief Check whether a new function can be added to the registry.