judahrand commented on code in PR #34234:
URL: https://github.com/apache/arrow/pull/34234#discussion_r1369276501
##########
python/pyarrow/acero.py:
##########
@@ -253,6 +254,93 @@ def _perform_join(join_type, left_operand, left_keys,
raise TypeError("Unsupported output type")
+def _perform_join_asof(left_operand, left_on, left_by,
+ right_operand, right_on, right_by,
+ tolerance, use_threads=True,
+ output_type=Table):
+ """
+ Perform asof join of two tables or datasets.
+
+ The result will be an output table with the result of the join operation
+
+ Parameters
+ ----------
+ left_operand : Table or Dataset
+ The left operand for the join operation.
+ left_on : str
+ The left key (or keys) on which the join operation should be performed.
+ left_by: str or list[str]
+ The left key (or keys) on which the join operation should be performed.
+ right_operand : Table or Dataset
+ The right operand for the join operation.
+ right_on : str or list[str]
+ The right key (or keys) on which the join operation should be
performed.
+ right_by: str or list[str]
+ The right key (or keys) on which the join operation should be
performed.
+ tolerance : int
+ The tolerance to use for the asof join. The tolerance is interpreted in
+ the same units as the "on" key.
+ output_type: Table or InMemoryDataset
+ The output type for the exec plan result.
+
+ Returns
+ -------
+ result_table : Table or InMemoryDataset
+ """
+ if not isinstance(left_operand, (Table, ds.Dataset)):
+ raise TypeError(f"Expected Table or Dataset, got {type(left_operand)}")
+ if not isinstance(right_operand, (Table, ds.Dataset)):
+ raise TypeError(f"Expected Table or Dataset, got
{type(right_operand)}")
+
+ if not isinstance(left_by, (tuple, list)):
+ left_by = [left_by]
+ if not isinstance(right_by, (tuple, list)):
+ right_by = [right_by]
+
+ # AsofJoin does not return on or by columns for right_operand.
+ right_columns = [
+ col for col in right_operand.schema.names
+ if col not in [right_on] + right_by
+ ]
+ columns_collisions = set(left_operand.schema.names) & set(right_columns)
+ if columns_collisions:
+ raise ValueError(
+ "Columns {} present in both tables. AsofJoin does not support "
+ "column collisions.".format(columns_collisions),
+ )
+
+ # Add the join node to the execplan
+ if isinstance(left_operand, ds.Dataset):
+ left_source = _dataset_to_decl(left_operand, use_threads=use_threads)
+ else:
+ left_source = Declaration(
+ "table_source", TableSourceNodeOptions(left_operand),
+ )
+ if isinstance(right_operand, ds.Dataset):
+ right_source = _dataset_to_decl(right_operand, use_threads=use_threads)
+ else:
+ right_source = Declaration(
+ "table_source", TableSourceNodeOptions(right_operand)
+ )
+
+ join_opts = AsofJoinNodeOptions(
+ left_on, left_by, right_on, right_by, tolerance
+ )
+ decl = Declaration(
+ "asofjoin", options=join_opts, inputs=[left_source, right_source]
+ )
+
+ # Last time I checked the asof join node does not support using threads.
+ result_table = decl.to_table(use_threads=False)
Review Comment:
`use_threads=True` does seem to work! It didn't when I originally opened
this PR way back when but now all seems well!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]