This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new a3cd962b8b GH-33976: [Python] Initial bindings for acero Declaration
and ExecNodeOptions classes (#34102)
a3cd962b8b is described below
commit a3cd962b8bc5c49f33824202dff382af13cfa873
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Fri Mar 3 13:46:53 2023 +0100
GH-33976: [Python] Initial bindings for acero Declaration and
ExecNodeOptions classes (#34102)
First step for GH-33976, adding basic bindings for the different
ExecNodeOptions classes and the Declaration class to combine those in a query.
Some notes on what is and what is not included in this PR:
* For source nodes, didn't expose the generic `SourceNodeOptions` et al,
only the concrete `TableSourceNodeOptions` (should probably also add
`RecordBatchReaderSourceNodeOptions`)
* Didn't yet expose any sink nodes. The table sink is implicitly used by
`Declaration.to_table()`, and given that there is currently no explicit API to
manually convert to ExecPlan and execute it, explicit table sink node bindings
didn't seem necessary.
* Also didn't yet expose the order_by sink node, because this requires a
custom sink when collecting as a Table, and it's not directly clear how this is
possible with the Declaration interface. This requires
https://github.com/apache/arrow/issues/34248 to be fixed first.
* Leaving dataset-based scan and write nodes for a follow-up PR
* Basic class for `Declaration` with a `to_table` method to execute the
plan and consume it into a Table, and a `to_reader()` to get a
RecordBatchReader (could also further add a `to_batches()` method)
--
* Issue: #33976
Lead-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
---
cpp/src/arrow/compute/exec/options.h | 3 +-
docs/source/cpp/streaming_execution.rst | 12 +-
python/CMakeLists.txt | 1 +
python/pyarrow/{_compute.pxd => _acero.pxd} | 39 +--
python/pyarrow/_acero.pyx | 477 ++++++++++++++++++++++++++++
python/pyarrow/_compute.pxd | 2 +
python/pyarrow/_compute.pyx | 63 ++--
python/pyarrow/includes/libarrow.pxd | 17 +
python/pyarrow/tests/test_acero.py | 293 +++++++++++++++++
python/setup.py | 1 +
10 files changed, 844 insertions(+), 64 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/options.h
b/cpp/src/arrow/compute/exec/options.h
index bc37144a3c..dc754f7004 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -204,7 +204,8 @@ class ARROW_EXPORT FetchNodeOptions : public
ExecNodeOptions {
int64_t count;
};
-/// \brief Make a node which executes expressions on input batches, producing
new batches.
+/// \brief Make a node which executes expressions on input batches, producing
batches
+/// of the same length with new columns.
///
/// Each expression will be evaluated against each batch which is pushed to
/// this node to produce a corresponding output column.
diff --git a/docs/source/cpp/streaming_execution.rst
b/docs/source/cpp/streaming_execution.rst
index ffe4fccbd2..49cc9b7fb7 100644
--- a/docs/source/cpp/streaming_execution.rst
+++ b/docs/source/cpp/streaming_execution.rst
@@ -648,8 +648,9 @@ Example of using ``table_source``
----------
``filter`` operation, as the name suggests, provides an option to define data
filtering
-criteria. It selects rows matching a given expression. Filters can be written
using
-:class:`arrow::compute::Expression`. For example, if we wish to keep rows
where the value
+criteria. It selects rows where the given expression evaluates to true.
Filters can be written using
+:class:`arrow::compute::Expression`, and the expression should have a return
type of boolean.
+For example, if we wish to keep rows where the value
of column ``b`` is greater than 3, then we can use the following expression.
Filter example:
@@ -668,8 +669,11 @@ Filter example:
``project`` operation rearranges, deletes, transforms, and creates columns.
Each output column is computed by evaluating an expression
-against the source record batch. This is exposed via
-:class:`arrow::compute::ProjectNodeOptions` which requires,
+against the source record batch. These must be scalar expressions
+(expressions consisting of scalar literals, field references and scalar
+functions, i.e. elementwise functions that return one value for each input
+row independent of the value of all other rows).
+This is exposed via :class:`arrow::compute::ProjectNodeOptions` which requires,
an :class:`arrow::compute::Expression` and name for each of the output columns
(if names are not
provided, the string representations of exprs will be used).
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index a2cf8901ba..8f846348f3 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -525,6 +525,7 @@ set(CYTHON_EXTENSIONS
_compute
_csv
_exec_plan
+ _acero
_feather
_fs
_hdfsio
diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_acero.pxd
similarity index 54%
copy from python/pyarrow/_compute.pxd
copy to python/pyarrow/_acero.pxd
index ee348e9816..84d90b24db 100644
--- a/python/pyarrow/_compute.pxd
+++ b/python/pyarrow/_acero.pxd
@@ -21,44 +21,23 @@ from pyarrow.lib cimport *
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-cdef class ScalarUdfContext(_Weakrefable):
- cdef:
- CScalarUdfContext c_context
-
- cdef void init(self, const CScalarUdfContext& c_context)
-
-cdef class FunctionOptions(_Weakrefable):
+cdef class ExecNodeOptions(_Weakrefable):
cdef:
- shared_ptr[CFunctionOptions] wrapped
-
- cdef const CFunctionOptions* get_options(self) except NULL
- cdef void init(self, const shared_ptr[CFunctionOptions]& sp)
-
- cdef inline shared_ptr[CFunctionOptions] unwrap(self)
-
+ shared_ptr[CExecNodeOptions] wrapped
-cdef class _SortOptions(FunctionOptions):
- pass
+ cdef void init(self, const shared_ptr[CExecNodeOptions]& sp)
+ cdef inline shared_ptr[CExecNodeOptions] unwrap(self) nogil
-cdef CExpression _bind(Expression filter, Schema schema) except *
-
-
-cdef class Expression(_Weakrefable):
+cdef class Declaration(_Weakrefable):
cdef:
- CExpression expr
-
- cdef void init(self, const CExpression& sp)
+ CDeclaration decl
- @staticmethod
- cdef wrap(const CExpression& sp)
-
- cdef inline CExpression unwrap(self)
+ cdef void init(self, const CDeclaration& c_decl)
@staticmethod
- cdef Expression _expr_or_scalar(object expr)
-
+ cdef wrap(const CDeclaration& c_decl)
-cdef CExpression _true
+ cdef inline CDeclaration unwrap(self) nogil
diff --git a/python/pyarrow/_acero.pyx b/python/pyarrow/_acero.pyx
new file mode 100644
index 0000000000..91f3ef6712
--- /dev/null
+++ b/python/pyarrow/_acero.pyx
@@ -0,0 +1,477 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ---------------------------------------------------------------------
+# Low-level Acero bindings
+
+# cython: profile=False
+# distutils: language = c++
+# cython: language_level = 3
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table,
pyarrow_wrap_table,
+ RecordBatchReader)
+from pyarrow.lib import frombytes, tobytes
+from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref,
_true
+from pyarrow.compute import field
+
+
+cdef class ExecNodeOptions(_Weakrefable):
+ __slots__ = () # avoid mistakingly creating attributes
+
+ cdef void init(self, const shared_ptr[CExecNodeOptions]& sp):
+ self.wrapped = sp
+
+ cdef inline shared_ptr[CExecNodeOptions] unwrap(self) nogil:
+ return self.wrapped
+
+
+cdef class _TableSourceNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, Table table):
+ cdef:
+ shared_ptr[CTable] c_table
+
+ c_table = pyarrow_unwrap_table(table)
+ self.wrapped.reset(
+ new CTableSourceNodeOptions(c_table)
+ )
+
+
+class TableSourceNodeOptions(_TableSourceNodeOptions):
+ """
+ A Source node which accepts a table.
+
+ This is the option class for the "table_source" node factory.
+
+ Parameters
+ ----------
+ table : pyarrow.Table
+ The table which acts as the data source.
+ """
+
+ def __init__(self, Table table):
+ self._set_options(table)
+
+
+cdef class _FilterNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, Expression filter_expression not None):
+ self.wrapped.reset(
+ new CFilterNodeOptions(<CExpression>filter_expression.unwrap())
+ )
+
+
+class FilterNodeOptions(_FilterNodeOptions):
+ """
+ Make a node which excludes some rows from batches passed through it.
+
+ This is the option class for the "filter" node factory.
+
+ The "filter" operation provides an option to define data filtering
+ criteria. It selects rows where the given expression evaluates to true.
+ Filters can be written using pyarrow.compute.Expression, and the
+ expression must have a return type of boolean.
+
+ Parameters
+ ----------
+ filter_expression : pyarrow.compute.Expression
+ """
+
+ def __init__(self, Expression filter_expression):
+ self._set_options(filter_expression)
+
+
+cdef class _ProjectNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, expressions, names=None):
+ cdef:
+ Expression expr
+ vector[CExpression] c_expressions
+ vector[c_string] c_names
+
+ for expr in expressions:
+ c_expressions.push_back(expr.unwrap())
+
+ if names is not None:
+ if len(names) != len(expressions):
+ raise ValueError(
+ "The number of names should be equal to the number of
expressions"
+ )
+
+ for name in names:
+ c_names.push_back(<c_string>tobytes(name))
+
+ self.wrapped.reset(
+ new CProjectNodeOptions(c_expressions, c_names)
+ )
+ else:
+ self.wrapped.reset(
+ new CProjectNodeOptions(c_expressions)
+ )
+
+
+class ProjectNodeOptions(_ProjectNodeOptions):
+ """
+ Make a node which executes expressions on input batches,
+ producing batches of the same length with new columns.
+
+ This is the option class for the "project" node factory.
+
+ The "project" operation rearranges, deletes, transforms, and
+ creates columns. Each output column is computed by evaluating
+ an expression against the source record batch. These must be
+ scalar expressions (expressions consisting of scalar literals,
+ field references and scalar functions, i.e. elementwise functions
+ that return one value for each input row independent of the value
+ of all other rows).
+
+ Parameters
+ ----------
+ expressions : list of pyarrow.compute.Expression
+ List of expressions to evaluate against the source batch. This must
+ be scalar expressions.
+ names : list of str, optional
+ List of names for each of the ouptut columns (same length as
+ `expressions`). If `names` is not provided, the string
+ representations of exprs will be used.
+ """
+
+ def __init__(self, expressions, names=None):
+ self._set_options(expressions, names)
+
+
+cdef class _AggregateNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, aggregates, keys=None):
+ cdef:
+ CAggregate c_aggr
+ vector[CAggregate] c_aggregations
+ vector[CFieldRef] c_keys
+
+ for arg_names, func_name, opts, name in aggregates:
+ c_aggr.function = tobytes(func_name)
+ if opts is not None:
+ c_aggr.options = (<FunctionOptions?>opts).wrapped
+ else:
+ c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
+ if not isinstance(arg_names, (list, tuple)):
+ arg_names = [arg_names]
+ for arg in arg_names:
+ c_aggr.target.push_back(_ensure_field_ref(arg))
+ c_aggr.name = tobytes(name)
+
+ c_aggregations.push_back(move(c_aggr))
+
+ if keys is None:
+ keys = []
+ for name in keys:
+ c_keys.push_back(_ensure_field_ref(name))
+
+ self.wrapped.reset(
+ new CAggregateNodeOptions(c_aggregations, c_keys)
+ )
+
+
+class AggregateNodeOptions(_AggregateNodeOptions):
+ """
+ Make a node which aggregates input batches, optionally grouped by keys.
+
+ This is the option class for the "aggregate" node factory.
+
+ Acero supports two types of aggregates: "scalar" aggregates,
+ and "hash" aggregates. Scalar aggregates reduce an array or scalar
+ input to a single scalar output (e.g. computing the mean of a column).
+ Hash aggregates act like GROUP BY in SQL and first partition data
+ based on one or more key columns, then reduce the data in each partition.
+ The aggregate node supports both types of computation, and can compute
+ any number of aggregations at once.
+
+ Parameters
+ ----------
+ aggregates : list of tuples
+ Aggregations which will be applied to the targetted fields.
+ Specified as a list of tuples, where each tuple is one aggregation
+ specification and consists of: aggregation target column(s) followed
+ by function name, aggregation function options object and the
+ output field name.
+ The target column(s) specification can be a single field reference,
+ an empty list or a list of fields unary, nullary and n-ary aggregation
+ functions respectively. Each field reference can be a string
+ column name or expression.
+ keys : list of field references, optional
+ Keys by which aggregations will be grouped. Each key can reference
+ a field using a string name or expression.
+ """
+
+ def __init__(self, aggregates, keys=None):
+ self._set_options(aggregates, keys)
+
+
+cdef class _HashJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(
+ self, join_type, left_keys, right_keys, left_output=None,
right_output=None,
+ output_suffix_for_left="", output_suffix_for_right="",
+ ):
+ cdef:
+ CJoinType c_join_type
+ vector[CFieldRef] c_left_keys
+ vector[CFieldRef] c_right_keys
+ vector[CFieldRef] c_left_output
+ vector[CFieldRef] c_right_output
+
+ # join type
+ if join_type == "left semi":
+ c_join_type = CJoinType_LEFT_SEMI
+ elif join_type == "right semi":
+ c_join_type = CJoinType_RIGHT_SEMI
+ elif join_type == "left anti":
+ c_join_type = CJoinType_LEFT_ANTI
+ elif join_type == "right anti":
+ c_join_type = CJoinType_RIGHT_ANTI
+ elif join_type == "inner":
+ c_join_type = CJoinType_INNER
+ elif join_type == "left outer":
+ c_join_type = CJoinType_LEFT_OUTER
+ elif join_type == "right outer":
+ c_join_type = CJoinType_RIGHT_OUTER
+ elif join_type == "full outer":
+ c_join_type = CJoinType_FULL_OUTER
+ else:
+ raise ValueError("Unsupported join type")
+
+ # left/right keys
+ if not isinstance(left_keys, (list, tuple)):
+ left_keys = [left_keys]
+ for key in left_keys:
+ c_left_keys.push_back(_ensure_field_ref(key))
+ if not isinstance(right_keys, (list, tuple)):
+ right_keys = [right_keys]
+ for key in right_keys:
+ c_right_keys.push_back(_ensure_field_ref(key))
+
+ # left/right output fields
+ if left_output is not None and right_output is not None:
+ for colname in left_output:
+ c_left_output.push_back(_ensure_field_ref(colname))
+ for colname in right_output:
+ c_right_output.push_back(_ensure_field_ref(colname))
+
+ self.wrapped.reset(
+ new CHashJoinNodeOptions(
+ c_join_type, c_left_keys, c_right_keys,
+ c_left_output, c_right_output,
+ _true,
+ <c_string>tobytes(output_suffix_for_left),
+ <c_string>tobytes(output_suffix_for_right)
+ )
+ )
+ else:
+ self.wrapped.reset(
+ new CHashJoinNodeOptions(
+ c_join_type, c_left_keys, c_right_keys,
+ _true,
+ <c_string>tobytes(output_suffix_for_left),
+ <c_string>tobytes(output_suffix_for_right)
+ )
+ )
+
+
+class HashJoinNodeOptions(_HashJoinNodeOptions):
+ """
+ Make a node which implements join operation using hash join strategy.
+
+ This is the option class for the "hashjoin" node factory.
+
+ Parameters
+ ----------
+ join_type : str
+ Type of join. One of "left semi", "right semi", "left anti",
+ "right anti", "inner", "left outer", "right outer", "full outer".
+ left_keys : str, Expression or list
+ Key fields from left input. Each key can be a string column name
+ or a field expression, or a list of such field references.
+ right_keys : str, Expression or list
+ Key fields from right input. See `left_keys` for details.
+ left_output : list, optional
+ List of output fields passed from left input. If left and right
+ output fields are not specified, all valid fields from both left and
+ right input will be output. Each field can be a string column name
+ or a field expression.
+ right_output : list, optional
+ List of output fields passed from right input. If left and right
+ output fields are not specified, all valid fields from both left and
+ right input will be output. Each field can be a string column name
+ or a field expression.
+ output_suffix_for_left : str
+ Suffix added to names of output fields coming from left input
+ (used to distinguish, if necessary, between fields of the same
+ name in left and right input and can be left empty if there are
+ no name collisions).
+ output_suffix_for_right : str
+ Suffix added to names of output fields coming from right input,
+ see `output_suffix_for_left` for details.
+ """
+
+ def __init__(
+ self, join_type, left_keys, right_keys, left_output=None,
right_output=None,
+ output_suffix_for_left="", output_suffix_for_right=""
+ ):
+ self._set_options(
+ join_type, left_keys, right_keys, left_output, right_output,
+ output_suffix_for_left, output_suffix_for_right
+ )
+
+
+cdef class Declaration(_Weakrefable):
+ """
+ Helper class for declaring the nodes of an ExecPlan.
+
+ A Declaration represents an unconstructed ExecNode, and potentially
+ more since its inputs may also be Declarations or when constructed
+ with ``from_sequence``.
+
+ The possible ExecNodes to use are registered with a name,
+ the "factory name", and need to be specified using this name, together
+ with its corresponding ExecNodeOptions subclass.
+
+ Parameters
+ ----------
+ factory_name : str
+ The ExecNode factory name, such as "table_source", "filter",
+ "project" etc. See the ExecNodeOptions subclasses for the exact
+ factory names to use.
+ options : ExecNodeOptions
+ Corresponding ExecNodeOptions subclass (matching the factory name).
+ inputs : list of Declaration, optional
+ Input nodes for this declaration. Optional if the node is a source
+ node, or when the declaration gets combined later with
+ ``from_sequence``.
+
+ Returns
+ -------
+ Declaration
+ """
+ cdef void init(self, const CDeclaration& c_decl):
+ self.decl = c_decl
+
+ @staticmethod
+ cdef wrap(const CDeclaration& c_decl):
+ cdef Declaration self = Declaration.__new__(Declaration)
+ self.init(c_decl)
+ return self
+
+ cdef inline CDeclaration unwrap(self) nogil:
+ return self.decl
+
+ def __init__(self, factory_name, ExecNodeOptions options, inputs=None):
+ cdef:
+ c_string c_factory_name
+ CDeclaration c_decl
+ vector[CDeclaration.Input] c_inputs
+
+ c_factory_name = tobytes(factory_name)
+
+ if inputs is not None:
+ for ipt in inputs:
+ c_inputs.push_back(
+ CDeclaration.Input((<Declaration>ipt).unwrap())
+ )
+
+ c_decl = CDeclaration(c_factory_name, c_inputs, options.unwrap())
+ self.init(c_decl)
+
+ @staticmethod
+ def from_sequence(decls):
+ """
+ Convenience factory for the common case of a simple sequence of nodes.
+
+ Each of the declarations will be appended to the inputs of the
+ subsequent declaration, and the final modified declaration will
+ be returned.
+
+ Parameters
+ ----------
+ decls : list of Declaration
+
+ Returns
+ -------
+ Declaration
+ """
+ cdef:
+ vector[CDeclaration] c_decls
+ CDeclaration c_decl
+
+ for decl in decls:
+ c_decls.push_back((<Declaration> decl).unwrap())
+
+ c_decl = CDeclaration.Sequence(c_decls)
+ return Declaration.wrap(c_decl)
+
+ def __str__(self):
+ return frombytes(GetResultValue(DeclarationToString(self.decl)))
+
+ def __repr__(self):
+ return "<pyarrow.acero.Declaration>\n{0}".format(str(self))
+
+ def to_table(self, bint use_threads=True):
+ """
+ Run the declaration and collect the results into a table.
+
+ This method will implicitly add a sink node to the declaration
+ to collect results into a table. It will then create an ExecPlan
+ from the declaration, start the exec plan, block until the plan
+ has finished, and return the created table.
+
+ Parameters
+ ----------
+ use_threads : bool, default True
+ If set to False, then all CPU work will be done on the calling
+ thread. I/O tasks will still happen on the I/O executor
+ and may be multi-threaded (but should not use significant CPU
+ resources).
+
+ Returns
+ -------
+ pyarrow.Table
+ """
+ cdef:
+ shared_ptr[CTable] c_table
+
+ with nogil:
+ c_table = GetResultValue(DeclarationToTable(self.unwrap(),
use_threads))
+ return pyarrow_wrap_table(c_table)
+
+ def to_reader(self, bint use_threads=True):
+ """Run the declaration and return results as a RecordBatchReader.
+
+ For details about the parameters, see `to_table`.
+
+ Returns
+ -------
+ pyarrow.RecordBatchReader
+ """
+ cdef:
+ RecordBatchReader reader
+ reader = RecordBatchReader.__new__(RecordBatchReader)
+ reader.reader.reset(
+ GetResultValue(DeclarationToReader(self.unwrap(),
use_threads)).release()
+ )
+ return reader
diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_compute.pxd
index ee348e9816..da195f9d63 100644
--- a/python/pyarrow/_compute.pxd
+++ b/python/pyarrow/_compute.pxd
@@ -62,3 +62,5 @@ cdef class Expression(_Weakrefable):
cdef CExpression _true
+
+cdef CFieldRef _ensure_field_ref(value) except *
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index c726b08449..b0e3026b6f 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -1395,38 +1395,43 @@ class MakeStructOptions(_MakeStructOptions):
self._set_options(field_names, field_nullability, field_metadata)
+cdef CFieldRef _ensure_field_ref(value) except *:
+ cdef:
+ CFieldRef field_ref
+ const CFieldRef* field_ref_ptr
+
+ if isinstance(value, (list, tuple)):
+ value = Expression._nested_field(tuple(value))
+
+ if isinstance(value, Expression):
+ field_ref_ptr = (<Expression>value).unwrap().field_ref()
+ if field_ref_ptr is NULL:
+ raise ValueError("Unable to get FieldRef from Expression")
+ field_ref = <CFieldRef>deref(field_ref_ptr)
+ elif isinstance(value, (bytes, str)):
+ if value.startswith(b'.' if isinstance(value, bytes) else '.'):
+ field_ref = GetResultValue(
+ CFieldRef.FromDotPath(<c_string>tobytes(value)))
+ else:
+ field_ref = CFieldRef(<c_string>tobytes(value))
+ elif isinstance(value, int):
+ field_ref = CFieldRef(<int> value)
+ else:
+ raise TypeError("Expected a field reference as a str or int, list of "
+ f"str or int, or Expression. Got {type(value)}
instead.")
+ return field_ref
+
+
cdef class _StructFieldOptions(FunctionOptions):
def _set_options(self, indices):
- cdef:
- CFieldRef field_ref
- const CFieldRef* field_ref_ptr
- if isinstance(indices, (list, tuple)):
- if len(indices):
- indices = Expression._nested_field(tuple(indices))
- else:
- # Allow empty indices; effecitively return same array
- self.wrapped.reset(
- new CStructFieldOptions(<vector[int]>indices))
- return
-
- if isinstance(indices, Expression):
- field_ref_ptr = (<Expression>indices).unwrap().field_ref()
- if field_ref_ptr is NULL:
- raise ValueError("Unable to get CFieldRef from Expression")
- field_ref = <CFieldRef>deref(field_ref_ptr)
- elif isinstance(indices, (bytes, str)):
- if indices.startswith(b'.' if isinstance(indices, bytes) else '.'):
- field_ref = GetResultValue(
- CFieldRef.FromDotPath(<c_string>tobytes(indices)))
- else:
- field_ref = CFieldRef(<c_string>tobytes(indices))
- elif isinstance(indices, int):
- field_ref = CFieldRef(<int> indices)
- else:
- raise TypeError("Expected List[str], List[int], List[bytes], "
- "Expression, bytes, str, or int. "
- f"Got: {type(indices)}")
+ if isinstance(indices, (list, tuple)) and not len(indices):
+ # Allow empty indices; effecitively return same array
+ self.wrapped.reset(
+ new CStructFieldOptions(<vector[int]>indices))
+ return
+
+ cdef CFieldRef field_ref = _ensure_field_ref(indices)
self.wrapped.reset(new CStructFieldOptions(field_ref))
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 4725ae8f44..295d89d351 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2583,6 +2583,7 @@ cdef extern from "arrow/compute/exec/options.h" namespace
"arrow::compute" nogil
pass
cdef cppclass CTableSourceNodeOptions
"arrow::compute::TableSourceNodeOptions"(CExecNodeOptions):
+ CTableSourceNodeOptions(shared_ptr[CTable] table)
CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t
max_batch_size)
cdef cppclass CSinkNodeOptions
"arrow::compute::SinkNodeOptions"(CExecNodeOptions):
@@ -2596,6 +2597,9 @@ cdef extern from "arrow/compute/exec/options.h" namespace
"arrow::compute" nogil
CProjectNodeOptions(vector[CExpression] expressions,
vector[c_string] names)
+ cdef cppclass CAggregateNodeOptions
"arrow::compute::AggregateNodeOptions"(CExecNodeOptions):
+ CAggregateNodeOptions(vector[CAggregate] aggregates, vector[CFieldRef]
names)
+
cdef cppclass COrderBySinkNodeOptions
"arrow::compute::OrderBySinkNodeOptions"(CExecNodeOptions):
COrderBySinkNodeOptions(vector[CSortOptions] options,
CAsyncExecBatchGenerator generator)
@@ -2666,6 +2670,19 @@ cdef extern from "arrow/compute/exec/exec_plan.h"
namespace "arrow::compute" nog
vector[CExecNode*] inputs,
const CExecNodeOptions& options)
+ CResult[shared_ptr[CTable]] DeclarationToTable(
+ CDeclaration declaration, c_bool use_threads
+ )
+ CResult[shared_ptr[CTable]] DeclarationToTable(
+ CDeclaration declaration, c_bool use_threads,
+ CMemoryPool* memory_pool, CFunctionRegistry* function_registry
+ )
+ CResult[unique_ptr[CRecordBatchReader]] DeclarationToReader(
+ CDeclaration declaration, c_bool use_threads
+ )
+
+ CResult[c_string] DeclarationToString(const CDeclaration& declaration)
+
cdef extern from "arrow/extension_type.h" namespace "arrow":
cdef cppclass CExtensionTypeRegistry" arrow::ExtensionTypeRegistry":
diff --git a/python/pyarrow/tests/test_acero.py
b/python/pyarrow/tests/test_acero.py
new file mode 100644
index 0000000000..4cba55f7e4
--- /dev/null
+++ b/python/pyarrow/tests/test_acero.py
@@ -0,0 +1,293 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import pyarrow as pa
+import pyarrow.compute as pc
+from pyarrow.compute import field
+
+from pyarrow._acero import (
+ TableSourceNodeOptions,
+ FilterNodeOptions,
+ ProjectNodeOptions,
+ AggregateNodeOptions,
+ HashJoinNodeOptions,
+ Declaration,
+)
+
+
[email protected]
+def table_source():
+ table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+ table_opts = TableSourceNodeOptions(table)
+ table_source = Declaration("table_source", options=table_opts)
+ return table_source
+
+
+def test_declaration():
+
+ table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+ table_opts = TableSourceNodeOptions(table)
+ filter_opts = FilterNodeOptions(field('a') > 1)
+
+ # using sequence
+ decl = Declaration.from_sequence([
+ Declaration("table_source", options=table_opts),
+ Declaration("filter", options=filter_opts)
+ ])
+ result = decl.to_table()
+ assert result.equals(table.slice(1, 2))
+
+ # using explicit inputs
+ table_source = Declaration("table_source", options=table_opts)
+ filtered = Declaration("filter", options=filter_opts,
inputs=[table_source])
+ result = filtered.to_table()
+ assert result.equals(table.slice(1, 2))
+
+
+def test_declaration_repr(table_source):
+
+ assert "TableSourceNode" in str(table_source)
+ assert "TableSourceNode" in repr(table_source)
+
+
+def test_declaration_to_reader(table_source):
+ with table_source.to_reader() as reader:
+ assert reader.schema == pa.schema([("a", pa.int64()), ("b",
pa.int64())])
+ result = reader.read_all()
+ expected = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+ assert result.equals(expected)
+
+
+def test_table_source():
+ with pytest.raises(TypeError):
+ TableSourceNodeOptions(pa.record_batch([pa.array([1, 2, 3])], ["a"]))
+
+ table_source = TableSourceNodeOptions(None)
+ decl = Declaration("table_source", table_source)
+ with pytest.raises(
+ ValueError, match="TableSourceNode requires table which is not null"
+ ):
+ _ = decl.to_table()
+
+
+def test_filter(table_source):
+ # referencing unknown field
+ decl = Declaration.from_sequence([
+ table_source,
+ Declaration("filter", options=FilterNodeOptions(field("c") > 1))
+ ])
+ with pytest.raises(ValueError, match=r"No match for FieldRef.Name\(c\)"):
+ _ = decl.to_table()
+
+ # requires a pyarrow Expression
+ with pytest.raises(TypeError):
+ FilterNodeOptions(pa.array([True, False, True]))
+ with pytest.raises(TypeError):
+ FilterNodeOptions(None)
+
+
+def test_project(table_source):
+ # default name from expression
+ decl = Declaration.from_sequence([
+ table_source,
+ Declaration("project", ProjectNodeOptions([pc.multiply(field("a"),
2)]))
+ ])
+ result = decl.to_table()
+ assert result.schema.names == ["multiply(a, 2)"]
+ assert result[0].to_pylist() == [2, 4, 6]
+
+ # provide name
+ decl = Declaration.from_sequence([
+ table_source,
+ Declaration("project", ProjectNodeOptions([pc.multiply(field("a"),
2)], ["a2"]))
+ ])
+ result = decl.to_table()
+ assert result.schema.names == ["a2"]
+ assert result["a2"].to_pylist() == [2, 4, 6]
+
+ # input validation
+ with pytest.raises(ValueError):
+ ProjectNodeOptions([pc.multiply(field("a"), 2)], ["a2", "b2"])
+
+ # no scalar expression
+ decl = Declaration.from_sequence([
+ table_source,
+ Declaration("project", ProjectNodeOptions([pc.sum(field("a"))]))
+ ])
+ with pytest.raises(ValueError, match="cannot Execute non-scalar
expression"):
+ _ = decl.to_table()
+
+
+def test_aggregate_scalar(table_source):
+ decl = Declaration.from_sequence([
+ table_source,
+ Declaration("aggregate", AggregateNodeOptions([("a", "sum", None,
"a_sum")]))
+ ])
+ result = decl.to_table()
+ assert result.schema.names == ["a_sum"]
+ assert result["a_sum"].to_pylist() == [6]
+
+ # with options class
+ table = pa.table({'a': [1, 2, None]})
+ aggr_opts = AggregateNodeOptions(
+ [("a", "sum", pc.ScalarAggregateOptions(skip_nulls=False), "a_sum")]
+ )
+ decl = Declaration.from_sequence([
+ Declaration("table_source", TableSourceNodeOptions(table)),
+ Declaration("aggregate", aggr_opts),
+ ])
+ result = decl.to_table()
+ assert result.schema.names == ["a_sum"]
+ assert result["a_sum"].to_pylist() == [None]
+
+ # test various ways of specifying the target column
+ for target in ["a", field("a"), 0, field(0), ["a"], [field("a")], [0]]:
+ aggr_opts = AggregateNodeOptions([(target, "sum", None, "a_sum")])
+ decl = Declaration.from_sequence(
+ [table_source, Declaration("aggregate", aggr_opts)]
+ )
+ result = decl.to_table()
+ assert result.schema.names == ["a_sum"]
+ assert result["a_sum"].to_pylist() == [6]
+
+ # proper error when specifying the wrong number of target columns
+ aggr_opts = AggregateNodeOptions([(["a", "b"], "sum", None, "a_sum")])
+ decl = Declaration.from_sequence(
+ [table_source, Declaration("aggregate", aggr_opts)]
+ )
+ with pytest.raises(
+ ValueError, match="Function 'sum' accepts 1 arguments but 2 passed"
+ ):
+ _ = decl.to_table()
+
+ # proper error when using hash aggregation without keys
+ aggr_opts = AggregateNodeOptions([("a", "hash_sum", None, "a_sum")])
+ decl = Declaration.from_sequence(
+ [table_source, Declaration("aggregate", aggr_opts)]
+ )
+ with pytest.raises(ValueError, match="is a hash aggregate function"):
+ _ = decl.to_table()
+
+
+def test_aggregate_hash():
+ table = pa.table({'a': [1, 2, None], 'b': ["foo", "bar", "foo"]})
+ table_opts = TableSourceNodeOptions(table)
+ table_source = Declaration("table_source", options=table_opts)
+
+ # default options
+ aggr_opts = AggregateNodeOptions(
+ [("a", "hash_count", None, "count(a)")], keys=["b"])
+ decl = Declaration.from_sequence([
+ table_source, Declaration("aggregate", aggr_opts)
+ ])
+ result = decl.to_table()
+ expected = pa.table({"count(a)": [1, 1], "b": ["foo", "bar"]})
+ assert result.equals(expected)
+
+ # specify function options
+ aggr_opts = AggregateNodeOptions(
+ [("a", "hash_count", pc.CountOptions("all"), "count(a)")], keys=["b"]
+ )
+ decl = Declaration.from_sequence([
+ table_source, Declaration("aggregate", aggr_opts)
+ ])
+ result = decl.to_table()
+ expected_all = pa.table({"count(a)": [2, 1], "b": ["foo", "bar"]})
+ assert result.equals(expected_all)
+
+ # specify keys as field references
+ aggr_opts = AggregateNodeOptions(
+ [("a", "hash_count", None, "count(a)")], keys=[field("b")]
+ )
+ decl = Declaration.from_sequence([
+ table_source, Declaration("aggregate", aggr_opts)
+ ])
+ result = decl.to_table()
+ assert result.equals(expected)
+
+ # wrong type of (aggregation) function
+ # TODO test with kernel that matches number of arguments (arity) -> avoid
segfault
+ aggr_opts = AggregateNodeOptions([("a", "sum", None, "a_sum")], keys=["b"])
+ decl = Declaration.from_sequence([
+ table_source, Declaration("aggregate", aggr_opts)
+ ])
+ with pytest.raises(ValueError):
+ _ = decl.to_table()
+
+
+def test_hash_join():
+ left = pa.table({'key': [1, 2, 3], 'a': [4, 5, 6]})
+ left_source = Declaration("table_source",
options=TableSourceNodeOptions(left))
+ right = pa.table({'key': [2, 3, 4], 'b': [4, 5, 6]})
+ right_source = Declaration("table_source",
options=TableSourceNodeOptions(right))
+
+ # inner join
+ join_opts = HashJoinNodeOptions("inner", left_keys="key", right_keys="key")
+ joined = Declaration(
+ "hashjoin", options=join_opts, inputs=[left_source, right_source])
+ result = joined.to_table()
+ expected = pa.table(
+ [[2, 3], [5, 6], [2, 3], [4, 5]],
+ names=["key", "a", "key", "b"])
+ assert result.equals(expected)
+
+ for keys in [field("key"), ["key"], [field("key")]]:
+ join_opts = HashJoinNodeOptions("inner", left_keys=keys,
right_keys=keys)
+ joined = Declaration(
+ "hashjoin", options=join_opts, inputs=[left_source, right_source])
+ result = joined.to_table()
+ assert result.equals(expected)
+
+ # left join
+ join_opts = HashJoinNodeOptions(
+ "left outer", left_keys="key", right_keys="key")
+ joined = Declaration(
+ "hashjoin", options=join_opts, inputs=[left_source, right_source])
+ result = joined.to_table()
+ expected = pa.table(
+ [[1, 2, 3], [4, 5, 6], [None, 2, 3], [None, 4, 5]],
+ names=["key", "a", "key", "b"]
+ )
+ assert result.sort_by("a").equals(expected)
+
+ # suffixes
+ join_opts = HashJoinNodeOptions(
+ "left outer", left_keys="key", right_keys="key",
+ output_suffix_for_left="_left", output_suffix_for_right="_right")
+ joined = Declaration(
+ "hashjoin", options=join_opts, inputs=[left_source, right_source])
+ result = joined.to_table()
+ expected = pa.table(
+ [[1, 2, 3], [4, 5, 6], [None, 2, 3], [None, 4, 5]],
+ names=["key_left", "a", "key_right", "b"]
+ )
+ assert result.sort_by("a").equals(expected)
+
+ # manually specifying output columns
+ join_opts = HashJoinNodeOptions(
+ "left outer", left_keys="key", right_keys="key",
+ left_output=["key", "a"], right_output=[field("b")])
+ joined = Declaration(
+ "hashjoin", options=join_opts, inputs=[left_source, right_source])
+ result = joined.to_table()
+ expected = pa.table(
+ [[1, 2, 3], [4, 5, 6], [None, 4, 5]],
+ names=["key", "a", "b"]
+ )
+ assert result.sort_by("a").equals(expected)
diff --git a/python/setup.py b/python/setup.py
index 4ee397ed4b..7598133206 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -213,6 +213,7 @@ class build_ext(_build_ext):
'_dataset_orc',
'_dataset_parquet',
'_exec_plan',
+ '_acero',
'_feather',
'_parquet',
'_parquet_encryption',