This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a3cd962b8b GH-33976: [Python] Initial bindings for acero Declaration 
and ExecNodeOptions classes (#34102)
a3cd962b8b is described below

commit a3cd962b8bc5c49f33824202dff382af13cfa873
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Fri Mar 3 13:46:53 2023 +0100

    GH-33976: [Python] Initial bindings for acero Declaration and 
ExecNodeOptions classes (#34102)
    
    First step for GH-33976, adding basic bindings for the different 
ExecNodeOptions classes and the Declaration class to combine those in a query.
    
    Some notes on what is and what is not included in this PR:
    
    * For source nodes, didn't expose the generic `SourceNodeOptions` et al, 
only the concrete `TableSourceNodeOptions` (should probably also add 
`RecordBatchReaderSourceNodeOptions`)
    * Didn't yet expose any sink nodes. The table sink is implicitly used by 
`Declaration.to_table()`, and given that there is currently no explicit API to 
manually convert to ExecPlan and execute it, explicit table sink node bindings 
didn't seem necessary.
    * Also didn't yet expose the order_by sink node, because this requires a 
custom sink when collecting as a Table, and it's not directly clear how this is 
possible with the Declaration interface. This requires 
https://github.com/apache/arrow/issues/34248 to be fixed first.
    * Leaving dataset-based scan and write nodes for a follow-up PR
    * Basic class for `Declaration` with a `to_table` method to execute the 
plan and consume it into a Table, and a `to_reader()` to get a 
RecordBatchReader (could also further add a `to_batches()` method)
    
    --
    
    * Issue: #33976
    
    Lead-authored-by: Joris Van den Bossche <[email protected]>
    Co-authored-by: Weston Pace <[email protected]>
    Signed-off-by: Joris Van den Bossche <[email protected]>
---
 cpp/src/arrow/compute/exec/options.h        |   3 +-
 docs/source/cpp/streaming_execution.rst     |  12 +-
 python/CMakeLists.txt                       |   1 +
 python/pyarrow/{_compute.pxd => _acero.pxd} |  39 +--
 python/pyarrow/_acero.pyx                   | 477 ++++++++++++++++++++++++++++
 python/pyarrow/_compute.pxd                 |   2 +
 python/pyarrow/_compute.pyx                 |  63 ++--
 python/pyarrow/includes/libarrow.pxd        |  17 +
 python/pyarrow/tests/test_acero.py          | 293 +++++++++++++++++
 python/setup.py                             |   1 +
 10 files changed, 844 insertions(+), 64 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/options.h 
b/cpp/src/arrow/compute/exec/options.h
index bc37144a3c..dc754f7004 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -204,7 +204,8 @@ class ARROW_EXPORT FetchNodeOptions : public 
ExecNodeOptions {
   int64_t count;
 };
 
-/// \brief Make a node which executes expressions on input batches, producing 
new batches.
+/// \brief Make a node which executes expressions on input batches, producing 
batches
+/// of the same length with new columns.
 ///
 /// Each expression will be evaluated against each batch which is pushed to
 /// this node to produce a corresponding output column.
diff --git a/docs/source/cpp/streaming_execution.rst 
b/docs/source/cpp/streaming_execution.rst
index ffe4fccbd2..49cc9b7fb7 100644
--- a/docs/source/cpp/streaming_execution.rst
+++ b/docs/source/cpp/streaming_execution.rst
@@ -648,8 +648,9 @@ Example of using ``table_source``
 ----------
 
 ``filter`` operation, as the name suggests, provides an option to define data 
filtering 
-criteria. It selects rows matching a given expression. Filters can be written 
using 
-:class:`arrow::compute::Expression`. For example, if we wish to keep rows 
where the value 
+criteria. It selects rows where the given expression evaluates to true. 
Filters can be written using
+:class:`arrow::compute::Expression`, and the expression should have a return 
type of boolean.
+For example, if we wish to keep rows where the value
 of column ``b`` is greater than 3,  then we can use the following expression.
 
 Filter example:
@@ -668,8 +669,11 @@ Filter example:
 
 ``project`` operation rearranges, deletes, transforms, and creates columns.
 Each output column is computed by evaluating an expression
-against the source record batch. This is exposed via 
-:class:`arrow::compute::ProjectNodeOptions` which requires,
+against the source record batch. These must be scalar expressions
+(expressions consisting of scalar literals, field references and scalar
+functions, i.e. elementwise functions that return one value for each input
+row independent of the value of all other rows).
+This is exposed via :class:`arrow::compute::ProjectNodeOptions` which requires,
 an :class:`arrow::compute::Expression` and name for each of the output columns 
(if names are not
 provided, the string representations of exprs will be used).  
 
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index a2cf8901ba..8f846348f3 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -525,6 +525,7 @@ set(CYTHON_EXTENSIONS
     _compute
     _csv
     _exec_plan
+    _acero
     _feather
     _fs
     _hdfsio
diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_acero.pxd
similarity index 54%
copy from python/pyarrow/_compute.pxd
copy to python/pyarrow/_acero.pxd
index ee348e9816..84d90b24db 100644
--- a/python/pyarrow/_compute.pxd
+++ b/python/pyarrow/_acero.pxd
@@ -21,44 +21,23 @@ from pyarrow.lib cimport *
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 
-cdef class ScalarUdfContext(_Weakrefable):
-    cdef:
-        CScalarUdfContext c_context
-
-    cdef void init(self, const CScalarUdfContext& c_context)
-
 
-cdef class FunctionOptions(_Weakrefable):
+cdef class ExecNodeOptions(_Weakrefable):
     cdef:
-        shared_ptr[CFunctionOptions] wrapped
-
-    cdef const CFunctionOptions* get_options(self) except NULL
-    cdef void init(self, const shared_ptr[CFunctionOptions]& sp)
-
-    cdef inline shared_ptr[CFunctionOptions] unwrap(self)
-
+        shared_ptr[CExecNodeOptions] wrapped
 
-cdef class _SortOptions(FunctionOptions):
-    pass
+    cdef void init(self, const shared_ptr[CExecNodeOptions]& sp)
+    cdef inline shared_ptr[CExecNodeOptions] unwrap(self) nogil
 
 
-cdef CExpression _bind(Expression filter, Schema schema) except *
-
-
-cdef class Expression(_Weakrefable):
+cdef class Declaration(_Weakrefable):
 
     cdef:
-        CExpression expr
-
-    cdef void init(self, const CExpression& sp)
+        CDeclaration decl
 
-    @staticmethod
-    cdef wrap(const CExpression& sp)
-
-    cdef inline CExpression unwrap(self)
+    cdef void init(self, const CDeclaration& c_decl)
 
     @staticmethod
-    cdef Expression _expr_or_scalar(object expr)
-
+    cdef wrap(const CDeclaration& c_decl)
 
-cdef CExpression _true
+    cdef inline CDeclaration unwrap(self) nogil
diff --git a/python/pyarrow/_acero.pyx b/python/pyarrow/_acero.pyx
new file mode 100644
index 0000000000..91f3ef6712
--- /dev/null
+++ b/python/pyarrow/_acero.pyx
@@ -0,0 +1,477 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ---------------------------------------------------------------------
+# Low-level Acero bindings
+
+# cython: profile=False
+# distutils: language = c++
+# cython: language_level = 3
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_dataset cimport *
+from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, 
pyarrow_wrap_table,
+                          RecordBatchReader)
+from pyarrow.lib import frombytes, tobytes
+from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, 
_true
+from pyarrow.compute import field
+
+
+cdef class ExecNodeOptions(_Weakrefable):
+    __slots__ = ()  # avoid mistakingly creating attributes
+
+    cdef void init(self, const shared_ptr[CExecNodeOptions]& sp):
+        self.wrapped = sp
+
+    cdef inline shared_ptr[CExecNodeOptions] unwrap(self) nogil:
+        return self.wrapped
+
+
+cdef class _TableSourceNodeOptions(ExecNodeOptions):
+
+    def _set_options(self, Table table):
+        cdef:
+            shared_ptr[CTable] c_table
+
+        c_table = pyarrow_unwrap_table(table)
+        self.wrapped.reset(
+            new CTableSourceNodeOptions(c_table)
+        )
+
+
+class TableSourceNodeOptions(_TableSourceNodeOptions):
+    """
+    A Source node which accepts a table.
+
+    This is the option class for the "table_source" node factory.
+
+    Parameters
+    ----------
+    table : pyarrow.Table
+        The table which acts as the data source.
+    """
+
+    def __init__(self, Table table):
+        self._set_options(table)
+
+
+cdef class _FilterNodeOptions(ExecNodeOptions):
+
+    def _set_options(self, Expression filter_expression not None):
+        self.wrapped.reset(
+            new CFilterNodeOptions(<CExpression>filter_expression.unwrap())
+        )
+
+
+class FilterNodeOptions(_FilterNodeOptions):
+    """
+    Make a node which excludes some rows from batches passed through it.
+
+    This is the option class for the "filter" node factory.
+
+    The "filter" operation provides an option to define data filtering
+    criteria. It selects rows where the given expression evaluates to true.
+    Filters can be written using pyarrow.compute.Expression, and the
+    expression must have a return type of boolean.
+
+    Parameters
+    ----------
+    filter_expression : pyarrow.compute.Expression
+    """
+
+    def __init__(self, Expression filter_expression):
+        self._set_options(filter_expression)
+
+
+cdef class _ProjectNodeOptions(ExecNodeOptions):
+
+    def _set_options(self, expressions, names=None):
+        cdef:
+            Expression expr
+            vector[CExpression] c_expressions
+            vector[c_string] c_names
+
+        for expr in expressions:
+            c_expressions.push_back(expr.unwrap())
+
+        if names is not None:
+            if len(names) != len(expressions):
+                raise ValueError(
+                    "The number of names should be equal to the number of 
expressions"
+                )
+
+            for name in names:
+                c_names.push_back(<c_string>tobytes(name))
+
+            self.wrapped.reset(
+                new CProjectNodeOptions(c_expressions, c_names)
+            )
+        else:
+            self.wrapped.reset(
+                new CProjectNodeOptions(c_expressions)
+            )
+
+
+class ProjectNodeOptions(_ProjectNodeOptions):
+    """
+    Make a node which executes expressions on input batches,
+    producing batches of the same length with new columns.
+
+    This is the option class for the "project" node factory.
+
+    The "project" operation rearranges, deletes, transforms, and
+    creates columns. Each output column is computed by evaluating
+    an expression against the source record batch. These must be
+    scalar expressions (expressions consisting of scalar literals,
+    field references and scalar functions, i.e. elementwise functions
+    that return one value for each input row independent of the value
+    of all other rows).
+
+    Parameters
+    ----------
+    expressions : list of pyarrow.compute.Expression
+        List of expressions to evaluate against the source batch. This must
+        be scalar expressions.
+    names : list of str, optional
+        List of names for each of the ouptut columns (same length as
+        `expressions`). If `names` is not provided, the string
+        representations of exprs will be used.
+    """
+
+    def __init__(self, expressions, names=None):
+        self._set_options(expressions, names)
+
+
+cdef class _AggregateNodeOptions(ExecNodeOptions):
+
+    def _set_options(self, aggregates, keys=None):
+        cdef:
+            CAggregate c_aggr
+            vector[CAggregate] c_aggregations
+            vector[CFieldRef] c_keys
+
+        for arg_names, func_name, opts, name in aggregates:
+            c_aggr.function = tobytes(func_name)
+            if opts is not None:
+                c_aggr.options = (<FunctionOptions?>opts).wrapped
+            else:
+                c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
+            if not isinstance(arg_names, (list, tuple)):
+                arg_names = [arg_names]
+            for arg in arg_names:
+                c_aggr.target.push_back(_ensure_field_ref(arg))
+            c_aggr.name = tobytes(name)
+
+            c_aggregations.push_back(move(c_aggr))
+
+        if keys is None:
+            keys = []
+        for name in keys:
+            c_keys.push_back(_ensure_field_ref(name))
+
+        self.wrapped.reset(
+            new CAggregateNodeOptions(c_aggregations, c_keys)
+        )
+
+
+class AggregateNodeOptions(_AggregateNodeOptions):
+    """
+    Make a node which aggregates input batches, optionally grouped by keys.
+
+    This is the option class for the "aggregate" node factory.
+
+    Acero supports two types of aggregates: "scalar" aggregates,
+    and "hash" aggregates. Scalar aggregates reduce an array or scalar
+    input to a single scalar output (e.g. computing the mean of a column).
+    Hash aggregates act like GROUP BY in SQL and first partition data
+    based on one or more key columns, then reduce the data in each partition.
+    The aggregate node supports both types of computation, and can compute
+    any number of aggregations at once.
+
+    Parameters
+    ----------
+    aggregates : list of tuples
+        Aggregations which will be applied to the targetted fields.
+        Specified as a list of tuples, where each tuple is one aggregation
+        specification and consists of: aggregation target column(s) followed
+        by function name, aggregation function options object and the
+        output field name.
+        The target column(s) specification can be a single field reference,
+        an empty list or a list of fields unary, nullary and n-ary aggregation
+        functions respectively. Each field reference can be a string
+        column name or expression.
+    keys : list of field references, optional
+        Keys by which aggregations will be grouped. Each key can reference
+        a field using a string name or expression.
+    """
+
+    def __init__(self, aggregates, keys=None):
+        self._set_options(aggregates, keys)
+
+
+cdef class _HashJoinNodeOptions(ExecNodeOptions):
+
+    def _set_options(
+        self, join_type, left_keys, right_keys, left_output=None, 
right_output=None,
+        output_suffix_for_left="", output_suffix_for_right="",
+    ):
+        cdef:
+            CJoinType c_join_type
+            vector[CFieldRef] c_left_keys
+            vector[CFieldRef] c_right_keys
+            vector[CFieldRef] c_left_output
+            vector[CFieldRef] c_right_output
+
+        # join type
+        if join_type == "left semi":
+            c_join_type = CJoinType_LEFT_SEMI
+        elif join_type == "right semi":
+            c_join_type = CJoinType_RIGHT_SEMI
+        elif join_type == "left anti":
+            c_join_type = CJoinType_LEFT_ANTI
+        elif join_type == "right anti":
+            c_join_type = CJoinType_RIGHT_ANTI
+        elif join_type == "inner":
+            c_join_type = CJoinType_INNER
+        elif join_type == "left outer":
+            c_join_type = CJoinType_LEFT_OUTER
+        elif join_type == "right outer":
+            c_join_type = CJoinType_RIGHT_OUTER
+        elif join_type == "full outer":
+            c_join_type = CJoinType_FULL_OUTER
+        else:
+            raise ValueError("Unsupported join type")
+
+        # left/right keys
+        if not isinstance(left_keys, (list, tuple)):
+            left_keys = [left_keys]
+        for key in left_keys:
+            c_left_keys.push_back(_ensure_field_ref(key))
+        if not isinstance(right_keys, (list, tuple)):
+            right_keys = [right_keys]
+        for key in right_keys:
+            c_right_keys.push_back(_ensure_field_ref(key))
+
+        # left/right output fields
+        if left_output is not None and right_output is not None:
+            for colname in left_output:
+                c_left_output.push_back(_ensure_field_ref(colname))
+            for colname in right_output:
+                c_right_output.push_back(_ensure_field_ref(colname))
+
+            self.wrapped.reset(
+                new CHashJoinNodeOptions(
+                    c_join_type, c_left_keys, c_right_keys,
+                    c_left_output, c_right_output,
+                    _true,
+                    <c_string>tobytes(output_suffix_for_left),
+                    <c_string>tobytes(output_suffix_for_right)
+                )
+            )
+        else:
+            self.wrapped.reset(
+                new CHashJoinNodeOptions(
+                    c_join_type, c_left_keys, c_right_keys,
+                    _true,
+                    <c_string>tobytes(output_suffix_for_left),
+                    <c_string>tobytes(output_suffix_for_right)
+                )
+            )
+
+
+class HashJoinNodeOptions(_HashJoinNodeOptions):
+    """
+    Make a node which implements join operation using hash join strategy.
+
+    This is the option class for the "hashjoin" node factory.
+
+    Parameters
+    ----------
+    join_type : str
+        Type of join. One of "left semi", "right semi", "left anti",
+        "right anti", "inner", "left outer", "right outer", "full outer".
+    left_keys : str, Expression or list
+        Key fields from left input. Each key can be a string column name
+        or a field expression, or a list of such field references.
+    right_keys : str, Expression or list
+        Key fields from right input. See `left_keys` for details.
+    left_output : list, optional
+        List of output fields passed from left input. If left and right
+        output fields are not specified, all valid fields from both left and
+        right input will be output. Each field can be a string column name
+        or a field expression.
+    right_output : list, optional
+        List of output fields passed from right input. If left and right
+        output fields are not specified, all valid fields from both left and
+        right input will be output. Each field can be a string column name
+        or a field expression.
+    output_suffix_for_left : str
+        Suffix added to names of output fields coming from left input
+        (used to distinguish, if necessary, between fields of the same
+        name in left and right input and can be left empty if there are
+        no name collisions).
+    output_suffix_for_right : str
+        Suffix added to names of output fields coming from right input,
+        see `output_suffix_for_left` for details.
+    """
+
+    def __init__(
+        self, join_type, left_keys, right_keys, left_output=None, 
right_output=None,
+        output_suffix_for_left="", output_suffix_for_right=""
+    ):
+        self._set_options(
+            join_type, left_keys, right_keys, left_output, right_output,
+            output_suffix_for_left, output_suffix_for_right
+        )
+
+
+cdef class Declaration(_Weakrefable):
+    """
+    Helper class for declaring the nodes of an ExecPlan.
+
+    A Declaration represents an unconstructed ExecNode, and potentially
+    more since its inputs may also be Declarations or when constructed
+    with ``from_sequence``.
+
+    The possible ExecNodes to use are registered with a name,
+    the "factory name", and need to be specified using this name, together
+    with its corresponding ExecNodeOptions subclass.
+
+    Parameters
+    ----------
+    factory_name : str
+        The ExecNode factory name, such as "table_source", "filter",
+        "project" etc. See the ExecNodeOptions subclasses for the exact
+        factory names to use.
+    options : ExecNodeOptions
+        Corresponding ExecNodeOptions subclass (matching the factory name).
+    inputs : list of Declaration, optional
+        Input nodes for this declaration. Optional if the node is a source
+        node, or when the declaration gets combined later with
+        ``from_sequence``.
+
+    Returns
+    -------
+    Declaration
+    """
+    cdef void init(self, const CDeclaration& c_decl):
+        self.decl = c_decl
+
+    @staticmethod
+    cdef wrap(const CDeclaration& c_decl):
+        cdef Declaration self = Declaration.__new__(Declaration)
+        self.init(c_decl)
+        return self
+
+    cdef inline CDeclaration unwrap(self) nogil:
+        return self.decl
+
+    def __init__(self, factory_name, ExecNodeOptions options, inputs=None):
+        cdef:
+            c_string c_factory_name
+            CDeclaration c_decl
+            vector[CDeclaration.Input] c_inputs
+
+        c_factory_name = tobytes(factory_name)
+
+        if inputs is not None:
+            for ipt in inputs:
+                c_inputs.push_back(
+                    CDeclaration.Input((<Declaration>ipt).unwrap())
+                )
+
+        c_decl = CDeclaration(c_factory_name, c_inputs, options.unwrap())
+        self.init(c_decl)
+
+    @staticmethod
+    def from_sequence(decls):
+        """
+        Convenience factory for the common case of a simple sequence of nodes.
+
+        Each of the declarations will be appended to the inputs of the
+        subsequent declaration, and the final modified declaration will
+        be returned.
+
+        Parameters
+        ----------
+        decls : list of Declaration
+
+        Returns
+        -------
+        Declaration
+        """
+        cdef:
+            vector[CDeclaration] c_decls
+            CDeclaration c_decl
+
+        for decl in decls:
+            c_decls.push_back((<Declaration> decl).unwrap())
+
+        c_decl = CDeclaration.Sequence(c_decls)
+        return Declaration.wrap(c_decl)
+
+    def __str__(self):
+        return frombytes(GetResultValue(DeclarationToString(self.decl)))
+
+    def __repr__(self):
+        return "<pyarrow.acero.Declaration>\n{0}".format(str(self))
+
+    def to_table(self, bint use_threads=True):
+        """
+        Run the declaration and collect the results into a table.
+
+        This method will implicitly add a sink node to the declaration
+        to collect results into a table. It will then create an ExecPlan
+        from the declaration, start the exec plan, block until the plan
+        has finished, and return the created table.
+
+        Parameters
+        ----------
+        use_threads : bool, default True
+            If set to False, then all CPU work will be done on the calling
+            thread. I/O tasks will still happen on the I/O executor
+            and may be multi-threaded (but should not use significant CPU
+            resources).
+
+        Returns
+        -------
+        pyarrow.Table
+        """
+        cdef:
+            shared_ptr[CTable] c_table
+
+        with nogil:
+            c_table = GetResultValue(DeclarationToTable(self.unwrap(), 
use_threads))
+        return pyarrow_wrap_table(c_table)
+
+    def to_reader(self, bint use_threads=True):
+        """Run the declaration and return results as a RecordBatchReader.
+
+        For details about the parameters, see `to_table`.
+
+        Returns
+        -------
+        pyarrow.RecordBatchReader
+        """
+        cdef:
+            RecordBatchReader reader
+        reader = RecordBatchReader.__new__(RecordBatchReader)
+        reader.reader.reset(
+            GetResultValue(DeclarationToReader(self.unwrap(), 
use_threads)).release()
+        )
+        return reader
diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_compute.pxd
index ee348e9816..da195f9d63 100644
--- a/python/pyarrow/_compute.pxd
+++ b/python/pyarrow/_compute.pxd
@@ -62,3 +62,5 @@ cdef class Expression(_Weakrefable):
 
 
 cdef CExpression _true
+
+cdef CFieldRef _ensure_field_ref(value) except *
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index c726b08449..b0e3026b6f 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -1395,38 +1395,43 @@ class MakeStructOptions(_MakeStructOptions):
         self._set_options(field_names, field_nullability, field_metadata)
 
 
+cdef CFieldRef _ensure_field_ref(value) except *:
+    cdef:
+        CFieldRef field_ref
+        const CFieldRef* field_ref_ptr
+
+    if isinstance(value, (list, tuple)):
+        value = Expression._nested_field(tuple(value))
+
+    if isinstance(value, Expression):
+        field_ref_ptr = (<Expression>value).unwrap().field_ref()
+        if field_ref_ptr is NULL:
+            raise ValueError("Unable to get FieldRef from Expression")
+        field_ref = <CFieldRef>deref(field_ref_ptr)
+    elif isinstance(value, (bytes, str)):
+        if value.startswith(b'.' if isinstance(value, bytes) else '.'):
+            field_ref = GetResultValue(
+                CFieldRef.FromDotPath(<c_string>tobytes(value)))
+        else:
+            field_ref = CFieldRef(<c_string>tobytes(value))
+    elif isinstance(value, int):
+        field_ref = CFieldRef(<int> value)
+    else:
+        raise TypeError("Expected a field reference as a str or int, list of "
+                        f"str or int, or Expression. Got {type(value)} 
instead.")
+    return field_ref
+
+
 cdef class _StructFieldOptions(FunctionOptions):
     def _set_options(self, indices):
-        cdef:
-            CFieldRef field_ref
-            const CFieldRef* field_ref_ptr
 
-        if isinstance(indices, (list, tuple)):
-            if len(indices):
-                indices = Expression._nested_field(tuple(indices))
-            else:
-                # Allow empty indices; effecitively return same array
-                self.wrapped.reset(
-                    new CStructFieldOptions(<vector[int]>indices))
-                return
-
-        if isinstance(indices, Expression):
-            field_ref_ptr = (<Expression>indices).unwrap().field_ref()
-            if field_ref_ptr is NULL:
-                raise ValueError("Unable to get CFieldRef from Expression")
-            field_ref = <CFieldRef>deref(field_ref_ptr)
-        elif isinstance(indices, (bytes, str)):
-            if indices.startswith(b'.' if isinstance(indices, bytes) else '.'):
-                field_ref = GetResultValue(
-                    CFieldRef.FromDotPath(<c_string>tobytes(indices)))
-            else:
-                field_ref = CFieldRef(<c_string>tobytes(indices))
-        elif isinstance(indices, int):
-            field_ref = CFieldRef(<int> indices)
-        else:
-            raise TypeError("Expected List[str], List[int], List[bytes], "
-                            "Expression, bytes, str, or int. "
-                            f"Got: {type(indices)}")
+        if isinstance(indices, (list, tuple)) and not len(indices):
+            # Allow empty indices; effecitively return same array
+            self.wrapped.reset(
+                new CStructFieldOptions(<vector[int]>indices))
+            return
+
+        cdef CFieldRef field_ref = _ensure_field_ref(indices)
         self.wrapped.reset(new CStructFieldOptions(field_ref))
 
 
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 4725ae8f44..295d89d351 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2583,6 +2583,7 @@ cdef extern from "arrow/compute/exec/options.h" namespace 
"arrow::compute" nogil
         pass
 
     cdef cppclass CTableSourceNodeOptions 
"arrow::compute::TableSourceNodeOptions"(CExecNodeOptions):
+        CTableSourceNodeOptions(shared_ptr[CTable] table)
         CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t 
max_batch_size)
 
     cdef cppclass CSinkNodeOptions 
"arrow::compute::SinkNodeOptions"(CExecNodeOptions):
@@ -2596,6 +2597,9 @@ cdef extern from "arrow/compute/exec/options.h" namespace 
"arrow::compute" nogil
         CProjectNodeOptions(vector[CExpression] expressions,
                             vector[c_string] names)
 
+    cdef cppclass CAggregateNodeOptions 
"arrow::compute::AggregateNodeOptions"(CExecNodeOptions):
+        CAggregateNodeOptions(vector[CAggregate] aggregates, vector[CFieldRef] 
names)
+
     cdef cppclass COrderBySinkNodeOptions 
"arrow::compute::OrderBySinkNodeOptions"(CExecNodeOptions):
         COrderBySinkNodeOptions(vector[CSortOptions] options,
                                 CAsyncExecBatchGenerator generator)
@@ -2666,6 +2670,19 @@ cdef extern from "arrow/compute/exec/exec_plan.h" 
namespace "arrow::compute" nog
                                      vector[CExecNode*] inputs,
                                      const CExecNodeOptions& options)
 
+    CResult[shared_ptr[CTable]] DeclarationToTable(
+        CDeclaration declaration, c_bool use_threads
+    )
+    CResult[shared_ptr[CTable]] DeclarationToTable(
+        CDeclaration declaration, c_bool use_threads,
+        CMemoryPool* memory_pool, CFunctionRegistry* function_registry
+    )
+    CResult[unique_ptr[CRecordBatchReader]] DeclarationToReader(
+        CDeclaration declaration, c_bool use_threads
+    )
+
+    CResult[c_string] DeclarationToString(const CDeclaration& declaration)
+
 
 cdef extern from "arrow/extension_type.h" namespace "arrow":
     cdef cppclass CExtensionTypeRegistry" arrow::ExtensionTypeRegistry":
diff --git a/python/pyarrow/tests/test_acero.py 
b/python/pyarrow/tests/test_acero.py
new file mode 100644
index 0000000000..4cba55f7e4
--- /dev/null
+++ b/python/pyarrow/tests/test_acero.py
@@ -0,0 +1,293 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+import pyarrow as pa
+import pyarrow.compute as pc
+from pyarrow.compute import field
+
+from pyarrow._acero import (
+    TableSourceNodeOptions,
+    FilterNodeOptions,
+    ProjectNodeOptions,
+    AggregateNodeOptions,
+    HashJoinNodeOptions,
+    Declaration,
+)
+
+
[email protected]
+def table_source():
+    table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+    table_opts = TableSourceNodeOptions(table)
+    table_source = Declaration("table_source", options=table_opts)
+    return table_source
+
+
+def test_declaration():
+
+    table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+    table_opts = TableSourceNodeOptions(table)
+    filter_opts = FilterNodeOptions(field('a') > 1)
+
+    # using sequence
+    decl = Declaration.from_sequence([
+        Declaration("table_source", options=table_opts),
+        Declaration("filter", options=filter_opts)
+    ])
+    result = decl.to_table()
+    assert result.equals(table.slice(1, 2))
+
+    # using explicit inputs
+    table_source = Declaration("table_source", options=table_opts)
+    filtered = Declaration("filter", options=filter_opts, 
inputs=[table_source])
+    result = filtered.to_table()
+    assert result.equals(table.slice(1, 2))
+
+
+def test_declaration_repr(table_source):
+
+    assert "TableSourceNode" in str(table_source)
+    assert "TableSourceNode" in repr(table_source)
+
+
+def test_declaration_to_reader(table_source):
+    with table_source.to_reader() as reader:
+        assert reader.schema == pa.schema([("a", pa.int64()), ("b", 
pa.int64())])
+        result = reader.read_all()
+    expected = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
+    assert result.equals(expected)
+
+
+def test_table_source():
+    with pytest.raises(TypeError):
+        TableSourceNodeOptions(pa.record_batch([pa.array([1, 2, 3])], ["a"]))
+
+    table_source = TableSourceNodeOptions(None)
+    decl = Declaration("table_source", table_source)
+    with pytest.raises(
+        ValueError, match="TableSourceNode requires table which is not null"
+    ):
+        _ = decl.to_table()
+
+
+def test_filter(table_source):
+    # referencing unknown field
+    decl = Declaration.from_sequence([
+        table_source,
+        Declaration("filter", options=FilterNodeOptions(field("c") > 1))
+    ])
+    with pytest.raises(ValueError, match=r"No match for FieldRef.Name\(c\)"):
+        _ = decl.to_table()
+
+    # requires a pyarrow Expression
+    with pytest.raises(TypeError):
+        FilterNodeOptions(pa.array([True, False, True]))
+    with pytest.raises(TypeError):
+        FilterNodeOptions(None)
+
+
+def test_project(table_source):
+    # default name from expression
+    decl = Declaration.from_sequence([
+        table_source,
+        Declaration("project", ProjectNodeOptions([pc.multiply(field("a"), 
2)]))
+    ])
+    result = decl.to_table()
+    assert result.schema.names == ["multiply(a, 2)"]
+    assert result[0].to_pylist() == [2, 4, 6]
+
+    # provide name
+    decl = Declaration.from_sequence([
+        table_source,
+        Declaration("project", ProjectNodeOptions([pc.multiply(field("a"), 
2)], ["a2"]))
+    ])
+    result = decl.to_table()
+    assert result.schema.names == ["a2"]
+    assert result["a2"].to_pylist() == [2, 4, 6]
+
+    # input validation
+    with pytest.raises(ValueError):
+        ProjectNodeOptions([pc.multiply(field("a"), 2)], ["a2", "b2"])
+
+    # no scalar expression
+    decl = Declaration.from_sequence([
+        table_source,
+        Declaration("project", ProjectNodeOptions([pc.sum(field("a"))]))
+    ])
+    with pytest.raises(ValueError, match="cannot Execute non-scalar 
expression"):
+        _ = decl.to_table()
+
+
+def test_aggregate_scalar(table_source):
+    decl = Declaration.from_sequence([
+        table_source,
+        Declaration("aggregate", AggregateNodeOptions([("a", "sum", None, 
"a_sum")]))
+    ])
+    result = decl.to_table()
+    assert result.schema.names == ["a_sum"]
+    assert result["a_sum"].to_pylist() == [6]
+
+    # with options class
+    table = pa.table({'a': [1, 2, None]})
+    aggr_opts = AggregateNodeOptions(
+        [("a", "sum", pc.ScalarAggregateOptions(skip_nulls=False), "a_sum")]
+    )
+    decl = Declaration.from_sequence([
+        Declaration("table_source", TableSourceNodeOptions(table)),
+        Declaration("aggregate", aggr_opts),
+    ])
+    result = decl.to_table()
+    assert result.schema.names == ["a_sum"]
+    assert result["a_sum"].to_pylist() == [None]
+
+    # test various ways of specifying the target column
+    for target in ["a", field("a"), 0, field(0), ["a"], [field("a")], [0]]:
+        aggr_opts = AggregateNodeOptions([(target, "sum", None, "a_sum")])
+        decl = Declaration.from_sequence(
+            [table_source, Declaration("aggregate", aggr_opts)]
+        )
+        result = decl.to_table()
+        assert result.schema.names == ["a_sum"]
+        assert result["a_sum"].to_pylist() == [6]
+
+    # proper error when specifying the wrong number of target columns
+    aggr_opts = AggregateNodeOptions([(["a", "b"], "sum", None, "a_sum")])
+    decl = Declaration.from_sequence(
+        [table_source, Declaration("aggregate", aggr_opts)]
+    )
+    with pytest.raises(
+        ValueError, match="Function 'sum' accepts 1 arguments but 2 passed"
+    ):
+        _ = decl.to_table()
+
+    # proper error when using hash aggregation without keys
+    aggr_opts = AggregateNodeOptions([("a", "hash_sum", None, "a_sum")])
+    decl = Declaration.from_sequence(
+        [table_source, Declaration("aggregate", aggr_opts)]
+    )
+    with pytest.raises(ValueError, match="is a hash aggregate function"):
+        _ = decl.to_table()
+
+
+def test_aggregate_hash():
+    table = pa.table({'a': [1, 2, None], 'b': ["foo", "bar", "foo"]})
+    table_opts = TableSourceNodeOptions(table)
+    table_source = Declaration("table_source", options=table_opts)
+
+    # default options
+    aggr_opts = AggregateNodeOptions(
+        [("a", "hash_count", None, "count(a)")], keys=["b"])
+    decl = Declaration.from_sequence([
+        table_source, Declaration("aggregate", aggr_opts)
+    ])
+    result = decl.to_table()
+    expected = pa.table({"count(a)": [1, 1], "b": ["foo", "bar"]})
+    assert result.equals(expected)
+
+    # specify function options
+    aggr_opts = AggregateNodeOptions(
+        [("a", "hash_count", pc.CountOptions("all"), "count(a)")], keys=["b"]
+    )
+    decl = Declaration.from_sequence([
+        table_source, Declaration("aggregate", aggr_opts)
+    ])
+    result = decl.to_table()
+    expected_all = pa.table({"count(a)": [2, 1], "b": ["foo", "bar"]})
+    assert result.equals(expected_all)
+
+    # specify keys as field references
+    aggr_opts = AggregateNodeOptions(
+        [("a", "hash_count", None, "count(a)")], keys=[field("b")]
+    )
+    decl = Declaration.from_sequence([
+        table_source, Declaration("aggregate", aggr_opts)
+    ])
+    result = decl.to_table()
+    assert result.equals(expected)
+
+    # wrong type of (aggregation) function
+    # TODO test with kernel that matches number of arguments (arity) -> avoid 
segfault
+    aggr_opts = AggregateNodeOptions([("a", "sum", None, "a_sum")], keys=["b"])
+    decl = Declaration.from_sequence([
+        table_source, Declaration("aggregate", aggr_opts)
+    ])
+    with pytest.raises(ValueError):
+        _ = decl.to_table()
+
+
+def test_hash_join():
+    left = pa.table({'key': [1, 2, 3], 'a': [4, 5, 6]})
+    left_source = Declaration("table_source", 
options=TableSourceNodeOptions(left))
+    right = pa.table({'key': [2, 3, 4], 'b': [4, 5, 6]})
+    right_source = Declaration("table_source", 
options=TableSourceNodeOptions(right))
+
+    # inner join
+    join_opts = HashJoinNodeOptions("inner", left_keys="key", right_keys="key")
+    joined = Declaration(
+        "hashjoin", options=join_opts, inputs=[left_source, right_source])
+    result = joined.to_table()
+    expected = pa.table(
+        [[2, 3], [5, 6], [2, 3], [4, 5]],
+        names=["key", "a", "key", "b"])
+    assert result.equals(expected)
+
+    for keys in [field("key"), ["key"], [field("key")]]:
+        join_opts = HashJoinNodeOptions("inner", left_keys=keys, 
right_keys=keys)
+        joined = Declaration(
+            "hashjoin", options=join_opts, inputs=[left_source, right_source])
+        result = joined.to_table()
+        assert result.equals(expected)
+
+    # left join
+    join_opts = HashJoinNodeOptions(
+        "left outer", left_keys="key", right_keys="key")
+    joined = Declaration(
+        "hashjoin", options=join_opts, inputs=[left_source, right_source])
+    result = joined.to_table()
+    expected = pa.table(
+        [[1, 2, 3], [4, 5, 6], [None, 2, 3], [None, 4, 5]],
+        names=["key", "a", "key", "b"]
+    )
+    assert result.sort_by("a").equals(expected)
+
+    # suffixes
+    join_opts = HashJoinNodeOptions(
+        "left outer", left_keys="key", right_keys="key",
+        output_suffix_for_left="_left", output_suffix_for_right="_right")
+    joined = Declaration(
+        "hashjoin", options=join_opts, inputs=[left_source, right_source])
+    result = joined.to_table()
+    expected = pa.table(
+        [[1, 2, 3], [4, 5, 6], [None, 2, 3], [None, 4, 5]],
+        names=["key_left", "a", "key_right", "b"]
+    )
+    assert result.sort_by("a").equals(expected)
+
+    # manually specifying output columns
+    join_opts = HashJoinNodeOptions(
+        "left outer", left_keys="key", right_keys="key",
+        left_output=["key", "a"], right_output=[field("b")])
+    joined = Declaration(
+        "hashjoin", options=join_opts, inputs=[left_source, right_source])
+    result = joined.to_table()
+    expected = pa.table(
+        [[1, 2, 3], [4, 5, 6], [None, 4, 5]],
+        names=["key", "a", "b"]
+    )
+    assert result.sort_by("a").equals(expected)
diff --git a/python/setup.py b/python/setup.py
index 4ee397ed4b..7598133206 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -213,6 +213,7 @@ class build_ext(_build_ext):
         '_dataset_orc',
         '_dataset_parquet',
         '_exec_plan',
+        '_acero',
         '_feather',
         '_parquet',
         '_parquet_encryption',


Reply via email to