jorisvandenbossche commented on code in PR #34234:
URL: https://github.com/apache/arrow/pull/34234#discussion_r1507683277
##########
python/pyarrow/_acero.pyx:
##########
@@ -392,6 +392,84 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
)
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+ cdef:
+ vector[CFieldRef] c_left_by
+ vector[CFieldRef] c_right_by
+ CAsofJoinKeys c_left_keys
+ CAsofJoinKeys c_right_keys
+ vector[CAsofJoinKeys] c_input_keys
+
+ # Prepare left AsofJoinNodeOption::Keys
+ if not isinstance(left_by, (list, tuple)):
+ left_by = [left_by]
+ for key in left_by:
+ c_left_by.push_back(_ensure_field_ref(key))
+
+ c_left_keys.on_key = _ensure_field_ref(left_on)
+ c_left_keys.by_key = c_left_by
+
+ c_input_keys.push_back(c_left_keys)
+
+ # Prepare right AsofJoinNodeOption::Keys
+ if not isinstance(right_by, (list, tuple)):
+ right_by = [right_by]
+ for key in right_by:
+ c_right_by.push_back(_ensure_field_ref(key))
+
+ c_right_keys.on_key = _ensure_field_ref(right_on)
+ c_right_keys.by_key = c_right_by
+
+ c_input_keys.push_back(c_right_keys)
+
+ self.wrapped.reset(
+ new CAsofJoinNodeOptions(
+ c_input_keys,
+ tolerance,
+ )
+ )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+ """
+ Make a node which implements 'as of join' operation.
+
+ This is the option class for the "asofjoin" node factory.
+
+ Parameters
+ ----------
+ left_on : str, Expression
+ The left key on which the join operation should be performed.
+ Can be a string column name or a field expression.
+
+ An inexact match is used on the “on” key.i.e., a row is considered a
+ match iff left_on - tolerance <= right_on <= left_on.
+
+ The input dataset must be sorted by the “on” key. Must be a single
Review Comment:
```suggestion
The input dataset must be sorted by the "on" key. Must be a single
```
##########
python/pyarrow/_acero.pyx:
##########
@@ -392,6 +392,84 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
)
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+ cdef:
+ vector[CFieldRef] c_left_by
+ vector[CFieldRef] c_right_by
+ CAsofJoinKeys c_left_keys
+ CAsofJoinKeys c_right_keys
+ vector[CAsofJoinKeys] c_input_keys
+
+ # Prepare left AsofJoinNodeOption::Keys
+ if not isinstance(left_by, (list, tuple)):
+ left_by = [left_by]
+ for key in left_by:
+ c_left_by.push_back(_ensure_field_ref(key))
+
+ c_left_keys.on_key = _ensure_field_ref(left_on)
+ c_left_keys.by_key = c_left_by
+
+ c_input_keys.push_back(c_left_keys)
+
+ # Prepare right AsofJoinNodeOption::Keys
+ if not isinstance(right_by, (list, tuple)):
+ right_by = [right_by]
+ for key in right_by:
+ c_right_by.push_back(_ensure_field_ref(key))
+
+ c_right_keys.on_key = _ensure_field_ref(right_on)
+ c_right_keys.by_key = c_right_by
+
+ c_input_keys.push_back(c_right_keys)
+
+ self.wrapped.reset(
+ new CAsofJoinNodeOptions(
+ c_input_keys,
+ tolerance,
+ )
+ )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+ """
+ Make a node which implements 'as of join' operation.
+
+ This is the option class for the "asofjoin" node factory.
+
+ Parameters
+ ----------
+ left_on : str, Expression
+ The left key on which the join operation should be performed.
+ Can be a string column name or a field expression.
+
+ An inexact match is used on the “on” key.i.e., a row is considered a
+ match iff left_on - tolerance <= right_on <= left_on.
+
+ The input dataset must be sorted by the “on” key. Must be a single
+ field of a common type.
+
+ Currently, the “on” key must be an integer, date, or timestamp type.
+ left_by: str, Expression or list[str]
+ The left keys on which the join operation should be performed.
Review Comment:
```suggestion
The left keys on which the join operation should be performed.
Exact equality is used for each field of the "by" keys.
```
##########
python/pyarrow/_acero.pyx:
##########
@@ -392,6 +392,84 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
)
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+ cdef:
+ vector[CFieldRef] c_left_by
+ vector[CFieldRef] c_right_by
+ CAsofJoinKeys c_left_keys
+ CAsofJoinKeys c_right_keys
+ vector[CAsofJoinKeys] c_input_keys
+
+ # Prepare left AsofJoinNodeOption::Keys
+ if not isinstance(left_by, (list, tuple)):
+ left_by = [left_by]
+ for key in left_by:
+ c_left_by.push_back(_ensure_field_ref(key))
+
+ c_left_keys.on_key = _ensure_field_ref(left_on)
+ c_left_keys.by_key = c_left_by
+
+ c_input_keys.push_back(c_left_keys)
+
+ # Prepare right AsofJoinNodeOption::Keys
+ if not isinstance(right_by, (list, tuple)):
+ right_by = [right_by]
+ for key in right_by:
+ c_right_by.push_back(_ensure_field_ref(key))
+
+ c_right_keys.on_key = _ensure_field_ref(right_on)
+ c_right_keys.by_key = c_right_by
+
+ c_input_keys.push_back(c_right_keys)
+
+ self.wrapped.reset(
+ new CAsofJoinNodeOptions(
+ c_input_keys,
+ tolerance,
+ )
+ )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+ """
+ Make a node which implements 'as of join' operation.
+
+ This is the option class for the "asofjoin" node factory.
+
+ Parameters
+ ----------
+ left_on : str, Expression
+ The left key on which the join operation should be performed.
+ Can be a string column name or a field expression.
+
+ An inexact match is used on the “on” key.i.e., a row is considered a
+ match iff left_on - tolerance <= right_on <= left_on.
+
+ The input dataset must be sorted by the “on” key. Must be a single
+ field of a common type.
+
+ Currently, the “on” key must be an integer, date, or timestamp type.
+ left_by: str, Expression or list[str]
Review Comment:
```suggestion
left_by: str, Expression or list
```
(the list can also be a list of Expressions)
##########
python/pyarrow/_acero.pyx:
##########
@@ -392,6 +392,84 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
)
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+ cdef:
+ vector[CFieldRef] c_left_by
+ vector[CFieldRef] c_right_by
+ CAsofJoinKeys c_left_keys
+ CAsofJoinKeys c_right_keys
+ vector[CAsofJoinKeys] c_input_keys
+
+ # Prepare left AsofJoinNodeOption::Keys
+ if not isinstance(left_by, (list, tuple)):
+ left_by = [left_by]
+ for key in left_by:
+ c_left_by.push_back(_ensure_field_ref(key))
+
+ c_left_keys.on_key = _ensure_field_ref(left_on)
+ c_left_keys.by_key = c_left_by
+
+ c_input_keys.push_back(c_left_keys)
+
+ # Prepare right AsofJoinNodeOption::Keys
+ if not isinstance(right_by, (list, tuple)):
+ right_by = [right_by]
+ for key in right_by:
+ c_right_by.push_back(_ensure_field_ref(key))
+
+ c_right_keys.on_key = _ensure_field_ref(right_on)
+ c_right_keys.by_key = c_right_by
+
+ c_input_keys.push_back(c_right_keys)
+
+ self.wrapped.reset(
+ new CAsofJoinNodeOptions(
+ c_input_keys,
+ tolerance,
+ )
+ )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+ """
+ Make a node which implements 'as of join' operation.
+
+ This is the option class for the "asofjoin" node factory.
+
+ Parameters
+ ----------
+ left_on : str, Expression
+ The left key on which the join operation should be performed.
+ Can be a string column name or a field expression.
+
+ An inexact match is used on the “on” key.i.e., a row is considered a
+ match iff left_on - tolerance <= right_on <= left_on.
+
+ The input dataset must be sorted by the “on” key. Must be a single
+ field of a common type.
+
+ Currently, the “on” key must be an integer, date, or timestamp type.
+ left_by: str, Expression or list[str]
+ The left keys on which the join operation should be performed.
+ Each key can be a string column name or a field expression,
+ or a list of such field references.
+ right_on : str, Expression
+ The right key on which the join operation should be performed.
+ See `left_on` for details.
+ right_by: str, Expression or list[str]
Review Comment:
```suggestion
right_by: str, Expression or list
```
##########
python/pyarrow/tests/test_exec_plan.py:
##########
@@ -323,6 +323,83 @@ def test_join_extension_array_column():
assert result["colB"] == pa.chunked_array(ext_array)
[email protected]("tolerance,expected", [
+ (
+ 1,
+ {
+ "colA": [1, 1, 5, 6, 7],
+ "col2": ["a", "b", "a", "b", "f"],
+ "colC": [1., None, None, None, None],
+ },
+ ),
+ (
+ 3,
+ {
+ "colA": [1, 1, 5, 6, 7],
+ "col2": ["a", "b", "a", "b", "f"],
+ "colC": [1., None, None, 3., None],
+ },
+ ),
+ (
+ -5,
+ {
+ "colA": [1, 1, 5, 6, 7],
+ "col2": ["a", "b", "a", "b", "f"],
+ "colC": [None, None, 1., None, None],
+ },
+ ),
+])
[email protected]("use_datasets", [False, True])
+def test_join_asof(tolerance, expected, use_datasets):
Review Comment:
This file of `test_exec_plan.py` exist more for historical reasons, from
before we had the Acero bindings. Do the test you added here add anything in
addition to what is either tested in `test_acero.py` or
`test_table.py`/`test_dataset.py`? If so, I think you can leave out those
tests, or if it does test something else, add that to one of the other files.
##########
python/pyarrow/table.pxi:
##########
@@ -4859,6 +4859,91 @@ cdef class Table(_Tabular):
output_type=Table
)
+ def join_asof(self, right_table, on, by, tolerance, right_on=None,
right_by=None):
+ """
+ Perform an asof join between this table and another one.
+
Review Comment:
I would still like to see a bit more expanded explanation (apart from the
individual keyword explanations) about _what_ and asof join exactly is.
Something indicating it does 1) an inexact join, 2) on a sorted dataset, 3)
potentially first joining on other attributes, and 4) typically useful for time
series data that are not perfectly aligned. Are that the most relevant
characteristics?
##########
python/pyarrow/_acero.pyx:
##########
@@ -392,6 +392,84 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
)
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+ cdef:
+ vector[CFieldRef] c_left_by
+ vector[CFieldRef] c_right_by
+ CAsofJoinKeys c_left_keys
+ CAsofJoinKeys c_right_keys
+ vector[CAsofJoinKeys] c_input_keys
+
+ # Prepare left AsofJoinNodeOption::Keys
+ if not isinstance(left_by, (list, tuple)):
+ left_by = [left_by]
+ for key in left_by:
+ c_left_by.push_back(_ensure_field_ref(key))
+
+ c_left_keys.on_key = _ensure_field_ref(left_on)
+ c_left_keys.by_key = c_left_by
+
+ c_input_keys.push_back(c_left_keys)
+
+ # Prepare right AsofJoinNodeOption::Keys
+ if not isinstance(right_by, (list, tuple)):
+ right_by = [right_by]
+ for key in right_by:
+ c_right_by.push_back(_ensure_field_ref(key))
+
+ c_right_keys.on_key = _ensure_field_ref(right_on)
+ c_right_keys.by_key = c_right_by
+
+ c_input_keys.push_back(c_right_keys)
+
+ self.wrapped.reset(
+ new CAsofJoinNodeOptions(
+ c_input_keys,
+ tolerance,
+ )
+ )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+ """
+ Make a node which implements 'as of join' operation.
+
+ This is the option class for the "asofjoin" node factory.
+
+ Parameters
+ ----------
+ left_on : str, Expression
+ The left key on which the join operation should be performed.
+ Can be a string column name or a field expression.
+
+ An inexact match is used on the “on” key.i.e., a row is considered a
+ match iff left_on - tolerance <= right_on <= left_on.
+
+ The input dataset must be sorted by the “on” key. Must be a single
+ field of a common type.
+
+ Currently, the “on” key must be an integer, date, or timestamp type.
Review Comment:
```suggestion
Currently, the "on" key must be an integer, date, or timestamp type.
```
##########
python/pyarrow/_acero.pyx:
##########
@@ -392,6 +392,84 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
)
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+ def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+ cdef:
+ vector[CFieldRef] c_left_by
+ vector[CFieldRef] c_right_by
+ CAsofJoinKeys c_left_keys
+ CAsofJoinKeys c_right_keys
+ vector[CAsofJoinKeys] c_input_keys
+
+ # Prepare left AsofJoinNodeOption::Keys
+ if not isinstance(left_by, (list, tuple)):
+ left_by = [left_by]
+ for key in left_by:
+ c_left_by.push_back(_ensure_field_ref(key))
+
+ c_left_keys.on_key = _ensure_field_ref(left_on)
+ c_left_keys.by_key = c_left_by
+
+ c_input_keys.push_back(c_left_keys)
+
+ # Prepare right AsofJoinNodeOption::Keys
+ if not isinstance(right_by, (list, tuple)):
+ right_by = [right_by]
+ for key in right_by:
+ c_right_by.push_back(_ensure_field_ref(key))
+
+ c_right_keys.on_key = _ensure_field_ref(right_on)
+ c_right_keys.by_key = c_right_by
+
+ c_input_keys.push_back(c_right_keys)
+
+ self.wrapped.reset(
+ new CAsofJoinNodeOptions(
+ c_input_keys,
+ tolerance,
+ )
+ )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+ """
+ Make a node which implements 'as of join' operation.
+
+ This is the option class for the "asofjoin" node factory.
+
+ Parameters
+ ----------
+ left_on : str, Expression
+ The left key on which the join operation should be performed.
+ Can be a string column name or a field expression.
+
+ An inexact match is used on the “on” key.i.e., a row is considered a
+ match iff left_on - tolerance <= right_on <= left_on.
Review Comment:
```suggestion
An inexact match is used on the "on" key, i.e. a row is considered a
match if and only if left_on - tolerance <= right_on <= left_on.
```
##########
python/pyarrow/table.pxi:
##########
@@ -4859,6 +4859,91 @@ cdef class Table(_Tabular):
output_type=Table
)
+ def join_asof(self, right_table, on, by, tolerance, right_on=None,
right_by=None):
+ """
+ Perform an asof join between this table and another one.
+
+ Result of the join will be a new Table, where further
+ operations can be applied.
+
+ Parameters
+ ----------
+ right_table : Table
+ The table to join to the current one, acting as the right table
+ in the join operation.
+ on : str
+ The column from current table that should be used as the on key
+ of the join operation left side.
+
+ An inexact match is used on the “on” key.i.e., a row is considered
a
+ match iff left_on - tolerance <= right_on <= left_on.
+
+ The input dataset must be sorted by the “on” key. Must be a single
+ field of a common type.
+
+ Currently, the “on” key must be an integer, date, or timestamp
type.
+ by : str or list[str]
+ The columns from current table that should be used as the keys
+ of the join operation left side. The join operation is then done
+ only for the matches in these columns.
+ tolerance : int
+ The tolerance for inexact "on" key matching. A right row is
considered
+ a match with the left row ``right.on - left.on <= tolerance``. The
+ ``tolerance`` may be:
+
+ - negative, in which case a past-as-of-join occurs;
+ - or positive, in which case a future-as-of-join occurs;
+ - or zero, in which case an exact-as-of-join occurs.
+
+ The tolerance is interpreted in the same units as the "on" key.
+ right_on : str or list[str], default None
+ The columns from the right_table that should be used as the on key
+ on the join operation right side.
+ When ``None`` use the same key name as the left table.
+ right_by : str or list[str], default None
+ The columns from the right_table that should be used as keys
+ on the join operation right side.
+ When ``None`` use the same key names as the left table.
+
+ Returns
+ -------
+ Table
+
+ Example
+ --------
+ >>> import pandas as pd
+ >>> import pyarrow as pa
+ >>> df1 = pd.DataFrame({'id': [1, 2, 3],
+ ... 'year': [2020, 2022, 2019]})
+ >>> df2 = pd.DataFrame({'id': [3, 4],
+ ... 'year': [2020, 2021],
+ ... 'n_legs': [5, 100],
+ ... 'animal': ["Brittle stars", "Centipede"]})
+ >>> t1 = pa.Table.from_pandas(df1).sort_by('year')
+ >>> t2 = pa.Table.from_pandas(df2).sort_by('year')
+
+ >>> t1.join_asof(
+ ... t2, on='year', by='id', tolerance=1
Review Comment:
The fact that there is no repetition of the vaues in the by "id" key in the
example data makes that it is difficult to see what exactly happens with the
"on" key?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]