jorisvandenbossche commented on code in PR #34102: URL: https://github.com/apache/arrow/pull/34102#discussion_r1107387879
########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. + keys : list, optional + Keys by which aggregations will be grouped. + + """ + + def __init__(self, aggregates, keys=None): + self._set_options(aggregates, keys) + + +cdef class _HashJoinNodeOptions(ExecNodeOptions): + # _join_type_map = { + # "left semi": CJoinType_LEFT_SEMI, + # "right semi": CJoinType_RIGHT_SEMI, + # "left anti": CJoinType_LEFT_ANTI, + # "right anti": CJoinType_RIGHT_ANTI, + # "inner": CJoinType_INNER, + # "left outer": CJoinType_LEFT_OUTER, + # "right outer": CJoinType_RIGHT_OUTER, + # "full outer": CJoinType_FULL_OUTER, + # } + + def _set_options( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="", + ): + cdef: + CJoinType c_join_type + vector[CFieldRef] c_left_keys + vector[CFieldRef] c_right_keys + vector[CFieldRef] c_left_output + vector[CFieldRef] c_right_output + + # join type + # try: + # c_join_type = self._join_type_map[join_type] + # except KeyError: + # raise ValueError("Unsupported join type") Review Comment: I am getting compile errors with this more compact option, so wrote it out with if/elses below (but will still look into the errors) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
