westonpace commented on code in PR #34102: URL: https://github.com/apache/arrow/pull/34102#discussion_r1109135570
########## python/pyarrow/_acero.pxd: ########## @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.lib cimport * +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * + + +cdef class ExecNodeOptions(_Weakrefable): + cdef: + shared_ptr[CExecNodeOptions] wrapped + + cdef const CExecNodeOptions* get_options(self) except NULL + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp) + cdef inline shared_ptr[CExecNodeOptions] unwrap(self) Review Comment: `nogil`? Or do you only add that when needed? I don't really know and am just looking at the difference between the two `unwrap` methods and hopefully learning something new :) ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() Review Comment: What is this for? Why use this over `unwrap`? ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] Review Comment: Sadly, I don't think options are printable. Declarations can be printed with `DeclarationToString`. We could probably add `ToString` to the options objects if needed but I don't think that is typical (e.g. CSV options, etc.) ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can Review Comment: "matching a given expression" is probably ok but it is a bit vague. Maybe "It selects rows where the given expression evaluates to true"? Also, we may want to clarify that the expression must have a return type of boolean. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) Review Comment: Value equality of options objects (i.e. `*self.unwrap() == *other.unwrap()`) should generally be good enough I think. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) Review Comment: In C++ we generally treat options objects as mutable. Though that does allow you to put them into invalid states. Not sure if you want to do the same here or not. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. Review Comment: These must be scalar expressions. That is, every function called in the expression must meet the following criteria: * The expression is stateless (the value of one row is independent of the value of all other rows) * The expression returns a value for each input row * The expression must accept arrays/scalars as inputs and return an array as output This is probably mostly important when users are creating UDFs (also, it might be nice to mention that UDFs can be used here provided they meet the criteria) ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. + keys : list, optional Review Comment: A list of what? ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ Review Comment: The output of this node is not clear, though it may be intuitive enough. This node will output batches with the same length as the input batches that have one column for each expression. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating Review Comment: "transform" and "create" is perhaps redundant. Although I don't know if it really matters. You could say you "transformed the column from int to string" even though what really happened is "you created a string column from the int column and dropped the int column." ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. + keys : list, optional + Keys by which aggregations will be grouped. + + """ + + def __init__(self, aggregates, keys=None): + self._set_options(aggregates, keys) + + +cdef class _HashJoinNodeOptions(ExecNodeOptions): + # _join_type_map = { + # "left semi": CJoinType_LEFT_SEMI, + # "right semi": CJoinType_RIGHT_SEMI, + # "left anti": CJoinType_LEFT_ANTI, + # "right anti": CJoinType_RIGHT_ANTI, + # "inner": CJoinType_INNER, + # "left outer": CJoinType_LEFT_OUTER, + # "right outer": CJoinType_RIGHT_OUTER, + # "full outer": CJoinType_FULL_OUTER, + # } + + def _set_options( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="", + ): + cdef: + CJoinType c_join_type + vector[CFieldRef] c_left_keys + vector[CFieldRef] c_right_keys + vector[CFieldRef] c_left_output + vector[CFieldRef] c_right_output + + # join type + # try: + # c_join_type = self._join_type_map[join_type] + # except KeyError: + # raise ValueError("Unsupported join type") + if join_type == "left semi": + c_join_type = CJoinType_LEFT_SEMI + elif join_type == "right semi": + c_join_type = CJoinType_RIGHT_SEMI + elif join_type == "left anti": + c_join_type = CJoinType_LEFT_ANTI + elif join_type == "right anti": + c_join_type = CJoinType_RIGHT_ANTI + elif join_type == "inner": + c_join_type = CJoinType_INNER + elif join_type == "left outer": + c_join_type = CJoinType_LEFT_OUTER + elif join_type == "right outer": + c_join_type = CJoinType_RIGHT_OUTER + elif join_type == "full outer": + c_join_type = CJoinType_FULL_OUTER + else: + raise ValueError("Unsupported join type") + + # left/right keys + if isinstance(left_keys, str): + left_keys = [left_keys] + for key in left_keys: + c_left_keys.push_back(_ensure_field_ref(key)) + if isinstance(right_keys, str): + right_keys = [right_keys] + for key in right_keys: + c_right_keys.push_back(_ensure_field_ref(key)) + + # left/right output fields + if left_output is not None and right_output is not None: + for colname in left_output: + c_left_output.push_back(_ensure_field_ref(colname)) + for colname in right_output: + c_right_output.push_back(_ensure_field_ref(colname)) + + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + c_left_output, c_right_output, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + else: + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + + +class HashJoinNodeOptions(_HashJoinNodeOptions): + """ + Make a node which implements join operation using hash join strategy. Review Comment: This seems a bit sparse. Maybe... Make a node which joins two inputs together based on common keys. Each row from the left input will be matched with rows from the right input where the keys are equal. Null keys do not match other null keys. If any key is null then that row will not match. Which columns are output, and the behavior of rows that do not match, will depend on the join type. inner: for each row in the left input there will be one output row per matching row in the right input. If there are no matching rows in the right input then there will be no output row for that input row. The output columns will be the left_keys, followed by the right_keys (these will be identical to the left_keys), followed by the left_output, followed by the right_output. left outer: behaves the same as an inner join. However, if there are no matching rows in the right input then there will be one output row that contains null for each column in right_keys and right_output. A row in the right input, that has no matching row in the left input, will not be output. The output columns will be identical to an inner join. right outer: for each row in the right input there will be one output row per matching row in the left input. If there are no matching rows in the left input then there will be one output row containing null in left_keys and left_output. A row in the left input, that has no matching row in the right input, will not be output. The output columns will be identical to an inner join. full outer: behaves the same as a left outer join. However, if there is a row in the right input, that has no matching row in the left input, then there will be one output row containing null for each column in left_keys and left_output. The output columns will be identical to an inner join. left semi: for each row in the left input there will be exactly one output row, if and only if, there is a matching row in the right input. The output columns will be the left_keys followed by the left_output. right semi: for each row in the right input there will be exactly one output row, if and only if, there is a matching row in the left input. The output columns will be the right_keys followed by the right_output. left anti: for each row in the left input there will be exactly one output row, if and only if, there is not a matching row in the right input. The output will be the same as left semi. right anti: for each row in the right input there will be exactly one output row, if and only if, there is not a matching row in the left input. The output will be the same as right semi. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. + keys : list, optional + Keys by which aggregations will be grouped. + + """ + + def __init__(self, aggregates, keys=None): + self._set_options(aggregates, keys) + + +cdef class _HashJoinNodeOptions(ExecNodeOptions): + # _join_type_map = { + # "left semi": CJoinType_LEFT_SEMI, + # "right semi": CJoinType_RIGHT_SEMI, + # "left anti": CJoinType_LEFT_ANTI, + # "right anti": CJoinType_RIGHT_ANTI, + # "inner": CJoinType_INNER, + # "left outer": CJoinType_LEFT_OUTER, + # "right outer": CJoinType_RIGHT_OUTER, + # "full outer": CJoinType_FULL_OUTER, + # } + + def _set_options( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="", + ): + cdef: + CJoinType c_join_type + vector[CFieldRef] c_left_keys + vector[CFieldRef] c_right_keys + vector[CFieldRef] c_left_output + vector[CFieldRef] c_right_output + + # join type + # try: + # c_join_type = self._join_type_map[join_type] + # except KeyError: + # raise ValueError("Unsupported join type") + if join_type == "left semi": + c_join_type = CJoinType_LEFT_SEMI + elif join_type == "right semi": + c_join_type = CJoinType_RIGHT_SEMI + elif join_type == "left anti": + c_join_type = CJoinType_LEFT_ANTI + elif join_type == "right anti": + c_join_type = CJoinType_RIGHT_ANTI + elif join_type == "inner": + c_join_type = CJoinType_INNER + elif join_type == "left outer": + c_join_type = CJoinType_LEFT_OUTER + elif join_type == "right outer": + c_join_type = CJoinType_RIGHT_OUTER + elif join_type == "full outer": + c_join_type = CJoinType_FULL_OUTER + else: + raise ValueError("Unsupported join type") + + # left/right keys + if isinstance(left_keys, str): + left_keys = [left_keys] + for key in left_keys: + c_left_keys.push_back(_ensure_field_ref(key)) + if isinstance(right_keys, str): + right_keys = [right_keys] + for key in right_keys: + c_right_keys.push_back(_ensure_field_ref(key)) + + # left/right output fields + if left_output is not None and right_output is not None: + for colname in left_output: + c_left_output.push_back(_ensure_field_ref(colname)) + for colname in right_output: + c_right_output.push_back(_ensure_field_ref(colname)) + + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + c_left_output, c_right_output, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + else: + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + + +class HashJoinNodeOptions(_HashJoinNodeOptions): + """ + Make a node which implements join operation using hash join strategy. + + Parameters + ---------- + join_type : str + Type of join. One of "left semi", "right semi", "left anti", + "right anti", "inner", "left outer", "right outer", "full outer". + left_keys + Key fields from left input. + right_keys + key fields from right input. + left_output + Output fields passed from left input. If left and right output + fields are not specified, all valid fields from both left and + right input will be output + right_output + Output fields passed from right input. If left and right output + fields are not specified, all valid fields from both left and + right input will be output + output_suffix_for_left : str + Suffix added to names of output fields coming from left input + (used to distinguish, if necessary, between fields of the same + name in left and right input and can be left empty if there are + no name collisions). + output_suffix_for_right : str + Suffix added to names of output fields coming from right input, + see `output_suffix_for_left`. + """ + + def __init__( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="" + ): + self._set_options( + join_type, left_keys, right_keys, left_output, right_output, + output_suffix_for_left, output_suffix_for_right + ) + + +cdef class Declaration(_Weakrefable): + """ + Helper class for declaring the nodes of an ExecPlan. + + A Declaration represents an unconstructed ExecNode, and potentially + more since its inputs may also be Declarations or when constructed + with ``from_sequence``. + + Parameters + ---------- + factory_name : str + The ExecNode factory name, such as "table_source", "filter", + "project" etc. + options : ExecNodeOptions + Corresponding ExecNodeOptions subclass (matching the factory name). + inputs : list of Declaration, optional + Input nodes for this declaration. Optional if the node is a source + node, or when the declaration gets combined later with + `from_sequence`. + + Returns + ------- + Declaration + """ + cdef void init(self, const CDeclaration& c_decl): + self.decl = c_decl + + @staticmethod + cdef wrap(const CDeclaration& c_decl): + cdef Declaration self = Declaration.__new__(Declaration) + self.init(c_decl) + return self + + cdef inline CDeclaration unwrap(self) nogil: + return self.decl + + def __init__(self, factory_name, ExecNodeOptions options, inputs=None): + cdef: + c_string c_factory_name + CDeclaration c_decl + vector[CDeclaration.Input] c_inputs + + c_factory_name = tobytes(factory_name) + + if inputs is not None: + for ipt in inputs: + c_inputs.push_back( + CDeclaration.Input((<Declaration>ipt).unwrap()) + ) + + c_decl = CDeclaration(c_factory_name, c_inputs, options.unwrap()) + self.init(c_decl) + + @staticmethod + def from_sequence(decls): + """ + Convenience factory for the common case of a simple sequence of nodes. + + Each of the declarations will be appended to the inputs of the + subsequent declaration, and the final modified declaration will + be returned. + + Parameters + ---------- + decls : list of Declaration + + Returns + ------- + Declaration + """ + cdef: + vector[CDeclaration] c_decls + CDeclaration c_decl + + for decl in decls: + c_decls.push_back((<Declaration> decl).unwrap()) + + c_decl = CDeclaration.Sequence(c_decls) + return Declaration.wrap(c_decl) + + def __str__(self): + return frombytes(GetResultValue(DeclarationToString(self.decl))) + + def __repr__(self): + return "<pyarrow.acero.Declaration>\n{0}".format(str(self)) + + def to_table(self, use_threads=True): + """ + Run the declaration and collect the results into a table. + + This method will implicitly add a sink node to the declaration Review Comment: Do users care about the existence of a sink node? I suppose they might if they are creating custom nodes and aware of such things. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") Review Comment: todo? Also, we should generally check things like this when the plan is created. Though I'm not really opposed to also checking them in python (and it seems like this particular check is missing in C++ :grimacing:) ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. Review Comment: I don't know if it helps to have some examples here. Similar to group_by I suspect this will be a little confusing. ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. Review Comment: ```suggestion producing new columns. ``` ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. + keys : list, optional + Keys by which aggregations will be grouped. + + """ + + def __init__(self, aggregates, keys=None): + self._set_options(aggregates, keys) + + +cdef class _HashJoinNodeOptions(ExecNodeOptions): + # _join_type_map = { + # "left semi": CJoinType_LEFT_SEMI, + # "right semi": CJoinType_RIGHT_SEMI, + # "left anti": CJoinType_LEFT_ANTI, + # "right anti": CJoinType_RIGHT_ANTI, + # "inner": CJoinType_INNER, + # "left outer": CJoinType_LEFT_OUTER, + # "right outer": CJoinType_RIGHT_OUTER, + # "full outer": CJoinType_FULL_OUTER, + # } + + def _set_options( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="", + ): + cdef: + CJoinType c_join_type + vector[CFieldRef] c_left_keys + vector[CFieldRef] c_right_keys + vector[CFieldRef] c_left_output + vector[CFieldRef] c_right_output + + # join type + # try: + # c_join_type = self._join_type_map[join_type] + # except KeyError: + # raise ValueError("Unsupported join type") + if join_type == "left semi": + c_join_type = CJoinType_LEFT_SEMI + elif join_type == "right semi": + c_join_type = CJoinType_RIGHT_SEMI + elif join_type == "left anti": + c_join_type = CJoinType_LEFT_ANTI + elif join_type == "right anti": + c_join_type = CJoinType_RIGHT_ANTI + elif join_type == "inner": + c_join_type = CJoinType_INNER + elif join_type == "left outer": + c_join_type = CJoinType_LEFT_OUTER + elif join_type == "right outer": + c_join_type = CJoinType_RIGHT_OUTER + elif join_type == "full outer": + c_join_type = CJoinType_FULL_OUTER + else: + raise ValueError("Unsupported join type") + + # left/right keys + if isinstance(left_keys, str): + left_keys = [left_keys] + for key in left_keys: + c_left_keys.push_back(_ensure_field_ref(key)) + if isinstance(right_keys, str): + right_keys = [right_keys] + for key in right_keys: + c_right_keys.push_back(_ensure_field_ref(key)) + + # left/right output fields + if left_output is not None and right_output is not None: + for colname in left_output: + c_left_output.push_back(_ensure_field_ref(colname)) + for colname in right_output: + c_right_output.push_back(_ensure_field_ref(colname)) + + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + c_left_output, c_right_output, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + else: + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + + +class HashJoinNodeOptions(_HashJoinNodeOptions): + """ + Make a node which implements join operation using hash join strategy. + + Parameters + ---------- + join_type : str + Type of join. One of "left semi", "right semi", "left anti", + "right anti", "inner", "left outer", "right outer", "full outer". Review Comment: Is there a default? I wonder if it makes sense to default to `inner`? ########## python/pyarrow/_acero.pyx: ########## @@ -0,0 +1,459 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# --------------------------------------------------------------------- +# Low-level Acero bindings + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_dataset cimport * +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, + RecordBatchReader) +from pyarrow.lib import frombytes, tobytes +from pyarrow._compute cimport Expression, FunctionOptions, _ensure_field_ref, _true +from pyarrow.compute import field + +# Initialize() # Initialise support for Datasets in ExecPlan + + +cdef class ExecNodeOptions(_Weakrefable): + __slots__ = () # avoid mistakingly creating attributes + + cdef const CExecNodeOptions* get_options(self) except NULL: + return self.wrapped.get() + + cdef void init(self, const shared_ptr[CExecNodeOptions]& sp): + self.wrapped = sp + + cdef inline shared_ptr[CExecNodeOptions] unwrap(self): + return self.wrapped + + # def __repr__(self): + # type_name = self.__class__.__name__ + # # Remove {} so we can use our own braces + # string_repr = frombytes(self.get_options().ToString())[1:-1] + # return f"{type_name}({string_repr})" + + # def __eq__(self, FunctionOptions other): + # return self.get_options().Equals(deref(other.get_options())) + + +cdef class _TableSourceNodeOptions(ExecNodeOptions): + + def _set_options(self, Table table): + cdef: + shared_ptr[CTable] c_table + + c_table = pyarrow_unwrap_table(table) + self.wrapped.reset( + new CTableSourceNodeOptions(c_table) + ) + + +class TableSourceNodeOptions(_TableSourceNodeOptions): + """ + A Source node which accepts a table. + + Parameters + ---------- + table : pyarrow.Table + The table which acts as the data source. + """ + + def __init__(self, Table table): + self._set_options(table) + + +cdef class _FilterNodeOptions(ExecNodeOptions): + + def _set_options(self, Expression filter_expression not None): + self.wrapped.reset( + new CFilterNodeOptions(<CExpression>filter_expression.unwrap()) + ) + + +class FilterNodeOptions(_FilterNodeOptions): + """ + Make a node which excludes some rows from batches passed through it. + + The "filter" operation provides an option to define data filtering + criteria. It selects rows matching a given expression. Filters can + be written using pyarrow.compute.Expression. + + Parameters + ---------- + filter_expression : pyarrow.compute.Expression + + """ + + def __init__(self, Expression filter_expression): + self._set_options(filter_expression) + + +cdef class _ProjectNodeOptions(ExecNodeOptions): + + def _set_options(self, expressions, names=None): + cdef: + Expression expr + vector[CExpression] c_expressions + vector[c_string] c_names + + for expr in expressions: + c_expressions.push_back(expr.unwrap()) + + if names is not None: + if len(names) != len(expressions): + raise ValueError("dd") + + for name in names: + c_names.push_back(<c_string>tobytes(name)) + + self.wrapped.reset( + new CProjectNodeOptions(c_expressions, c_names) + ) + else: + self.wrapped.reset( + new CProjectNodeOptions(c_expressions) + ) + + +class ProjectNodeOptions(_ProjectNodeOptions): + """ + Make a node which executes expressions on input batches, + producing new batches. + + The "project" operation rearranges, deletes, transforms, and + creates columns. Each output column is computed by evaluating + an expression against the source record batch. + + Parameters + ---------- + expressions : list of pyarrow.compute.Expression + List of expressions to evaluate against the source batch. + names : list of str + List of names for each of the ouptut columns (same length as + `expressions`). If `names` is not provided, the string + representations of exprs will be used. + """ + + def __init__(self, expressions, names=None): + self._set_options(expressions, names) + + +cdef class _AggregateNodeOptions(ExecNodeOptions): + + def _set_options(self, aggregates, keys=None): + cdef: + CAggregate c_aggr + vector[CAggregate] c_aggregations + vector[CFieldRef] c_keys + + for arg_names, func_name, opts, name in aggregates: + c_aggr.function = tobytes(func_name) + if opts is not None: + c_aggr.options = (<FunctionOptions?>opts).wrapped + else: + c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr + for arg in arg_names: + c_aggr.target.push_back(_ensure_field_ref(arg)) + c_aggr.name = tobytes(name) + + c_aggregations.push_back(move(c_aggr)) + + if keys is None: + keys = [] + for name in keys: + c_keys.push_back(_ensure_field_ref(name)) + + self.wrapped.reset( + new CAggregateNodeOptions(c_aggregations, c_keys) + ) + + +class AggregateNodeOptions(_AggregateNodeOptions): + """ + Make a node which aggregates input batches, optionally grouped by keys. + + Acero supports two types of aggregates: "scalar" aggregates, + and "hash" aggregates. Scalar aggregates reduce an array or scalar + input to a single scalar output (e.g. computing the mean of a column). + Hash aggregates act like GROUP BY in SQL and first partition data + based on one or more key columns, then reduce the data in each partition. + The aggregate node supports both types of computation, and can compute + any number of aggregations at once. + + Parameters + ---------- + aggregates : list of tuples + Aggregations which will be applied to the targetted fields. + Specified as a list of tuples, where each tuple is one aggregation + specification and consists of: aggregation column name followed + by function name, aggregation function options object and the + output field name. + The column name can be a string, an empty list or a list of + column names, for unary, nullary and n-ary aggregation functions + respectively. + keys : list, optional + Keys by which aggregations will be grouped. + + """ + + def __init__(self, aggregates, keys=None): + self._set_options(aggregates, keys) + + +cdef class _HashJoinNodeOptions(ExecNodeOptions): + # _join_type_map = { + # "left semi": CJoinType_LEFT_SEMI, + # "right semi": CJoinType_RIGHT_SEMI, + # "left anti": CJoinType_LEFT_ANTI, + # "right anti": CJoinType_RIGHT_ANTI, + # "inner": CJoinType_INNER, + # "left outer": CJoinType_LEFT_OUTER, + # "right outer": CJoinType_RIGHT_OUTER, + # "full outer": CJoinType_FULL_OUTER, + # } + + def _set_options( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="", + ): + cdef: + CJoinType c_join_type + vector[CFieldRef] c_left_keys + vector[CFieldRef] c_right_keys + vector[CFieldRef] c_left_output + vector[CFieldRef] c_right_output + + # join type + # try: + # c_join_type = self._join_type_map[join_type] + # except KeyError: + # raise ValueError("Unsupported join type") + if join_type == "left semi": + c_join_type = CJoinType_LEFT_SEMI + elif join_type == "right semi": + c_join_type = CJoinType_RIGHT_SEMI + elif join_type == "left anti": + c_join_type = CJoinType_LEFT_ANTI + elif join_type == "right anti": + c_join_type = CJoinType_RIGHT_ANTI + elif join_type == "inner": + c_join_type = CJoinType_INNER + elif join_type == "left outer": + c_join_type = CJoinType_LEFT_OUTER + elif join_type == "right outer": + c_join_type = CJoinType_RIGHT_OUTER + elif join_type == "full outer": + c_join_type = CJoinType_FULL_OUTER + else: + raise ValueError("Unsupported join type") + + # left/right keys + if isinstance(left_keys, str): + left_keys = [left_keys] + for key in left_keys: + c_left_keys.push_back(_ensure_field_ref(key)) + if isinstance(right_keys, str): + right_keys = [right_keys] + for key in right_keys: + c_right_keys.push_back(_ensure_field_ref(key)) + + # left/right output fields + if left_output is not None and right_output is not None: + for colname in left_output: + c_left_output.push_back(_ensure_field_ref(colname)) + for colname in right_output: + c_right_output.push_back(_ensure_field_ref(colname)) + + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + c_left_output, c_right_output, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + else: + self.wrapped.reset( + new CHashJoinNodeOptions( + c_join_type, c_left_keys, c_right_keys, + _true, + <c_string>tobytes(output_suffix_for_left), + <c_string>tobytes(output_suffix_for_right) + ) + ) + + +class HashJoinNodeOptions(_HashJoinNodeOptions): + """ + Make a node which implements join operation using hash join strategy. + + Parameters + ---------- + join_type : str + Type of join. One of "left semi", "right semi", "left anti", + "right anti", "inner", "left outer", "right outer", "full outer". + left_keys + Key fields from left input. + right_keys + key fields from right input. + left_output + Output fields passed from left input. If left and right output + fields are not specified, all valid fields from both left and + right input will be output + right_output + Output fields passed from right input. If left and right output + fields are not specified, all valid fields from both left and + right input will be output + output_suffix_for_left : str + Suffix added to names of output fields coming from left input + (used to distinguish, if necessary, between fields of the same + name in left and right input and can be left empty if there are + no name collisions). + output_suffix_for_right : str + Suffix added to names of output fields coming from right input, + see `output_suffix_for_left`. + """ + + def __init__( + self, join_type, left_keys, right_keys, left_output=None, right_output=None, + output_suffix_for_left="", output_suffix_for_right="" + ): + self._set_options( + join_type, left_keys, right_keys, left_output, right_output, + output_suffix_for_left, output_suffix_for_right + ) + + +cdef class Declaration(_Weakrefable): + """ + Helper class for declaring the nodes of an ExecPlan. + + A Declaration represents an unconstructed ExecNode, and potentially + more since its inputs may also be Declarations or when constructed + with ``from_sequence``. + + Parameters + ---------- + factory_name : str + The ExecNode factory name, such as "table_source", "filter", + "project" etc. Review Comment: I think users may be confused by this since it might not be clear to them why a "factory name" is a necessary concept at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
