jorisvandenbossche commented on code in PR #34234:
URL: https://github.com/apache/arrow/pull/34234#discussion_r1135226479


##########
python/pyarrow/_dataset.pyx:
##########
@@ -857,6 +857,54 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, 
coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+    def join_asof(self, right_dataset, on, by, tolerance, right_on=None, 
right_by=None):
+        """
+        Perform an asof join between this dataset and another one.
+
+        Result of the join will be a new dataset, where further
+        operations can be applied.
+
+        Parameters
+        ----------
+        right_dataset : dataset
+            The dataset to join to the current one, acting as the right dataset
+            in the join operation.
+        on : str
+            The column from current dataset that should be used as the on key
+            of the join operation left side.
+        by : str or list[str]
+            The columns from current dataset that should be used as the by keys
+            of the join operation left side.

Review Comment:
   We will need to explain here what's the difference between "on" and "by" 
keys.



##########
python/pyarrow/tests/test_exec_plan.py:
##########
@@ -321,3 +321,77 @@ def test_join_extension_array_column():
     result = ep._perform_join(
         "left outer", t1, ["colB"], t3, ["colC"])
     assert result["colB"] == pa.chunked_array(ext_array)
+
+
[email protected]("tolerance,expected", [
+    (
+        1,
+        {
+            "colA": [1, 1, 5, 6, 7],
+            "col2": ["a", "b", "a", "b", "f"],
+            "colC": [1., None, None, None, None],
+        },
+    ),
+    (
+        3,
+        {
+            "colA": [1, 1, 5, 6, 7],
+            "col2": ["a", "b", "a", "b", "f"],
+            "colC": [1., None, None, 3., None],
+        },
+    ),
+    (
+        -5,
+        {
+            "colA": [1, 1, 5, 6, 7],
+            "col2": ["a", "b", "a", "b", "f"],
+            "colC": [None, None, 1., None, None],
+        },
+    ),
+])
[email protected]("use_datasets", [False, True])
+def test_join_asof(tolerance, expected, use_datasets):
+    # Allocate table here instead of using parametrize
+    # this prevents having arrow allocated memory forever around.
+    expected = pa.table(expected)
+
+    t1 = pa.Table.from_pydict({
+        "colA": [1, 1, 5, 6, 7],
+        "col2": ["a", "b", "a", "b", "f"]
+    })
+
+    t2 = pa.Table.from_pydict({
+        "colB": [2, 9, 15],
+        "col3": ["a", "b", "g"],
+        "colC": [1., 3., 5.]
+    })
+
+    if use_datasets:
+        t1 = ds.dataset([t1])
+        t2 = ds.dataset([t2])
+
+    r = ep._perform_join_asof(t1, "colA", "col2", t2, "colB", "col3", 
tolerance)
+    r = r.combine_chunks()
+    r = r.sort_by("colA")
+    assert r == expected
+
+
+def test_table_join_asof_collisions():
+    t1 = pa.table({
+        "colA": [1, 2, 6],
+        "colB": [10, 20, 60],
+        "on": [1, 2, 3],
+        "colVals": ["a", "b", "f"]
+    })
+
+    t2 = pa.table({
+        "colB": [99, 20, 10],
+        "colVals": ["Z", "B", "A"],
+        "colUniq": [100, 200, 300],
+        "colA": [99, 2, 1],
+        "on": [2, 3, 4],
+    })
+
+    msg = "colVals present in both tables. AsofJoin does not support column 
collisions."
+    with pytest.raises(ValueError, match=msg):
+        ep._perform_join_asof(t1, "on", ["colA", "colB"], t2, "on", ["colA", 
"colB"], 1)

Review Comment:
   Another failure case to test: what happens if you pass by keys of different 
length? (or specifying columns where the type doesn't match)



##########
python/pyarrow/_acero.pyx:
##########
@@ -340,6 +340,78 @@ class HashJoinNodeOptions(_HashJoinNodeOptions):
         )
 
 
+cdef class _AsofJoinNodeOptions(ExecNodeOptions):
+
+    def _set_options(self, left_on, left_by, right_on, right_by, tolerance):
+        cdef:
+            vector[CFieldRef] c_left_by
+            vector[CFieldRef] c_right_by
+            CAsofJoinKeys c_left_keys
+            CAsofJoinKeys c_right_keys
+            vector[CAsofJoinKeys] c_input_keys
+
+        # Prepare left AsofJoinNodeOption::Keys
+        if not isinstance(left_by, (list, tuple)):
+            left_by = [left_by]
+        for key in left_by:
+            c_left_by.push_back(_ensure_field_ref(key))
+
+        c_left_keys.on_key = _ensure_field_ref(left_on)
+        c_left_keys.by_key = c_left_by
+
+        c_input_keys.push_back(c_left_keys)
+
+        # Prepare right AsofJoinNodeOption::Keys
+        if not isinstance(right_by, (list, tuple)):
+            right_by = [right_by]
+        for key in right_by:
+            c_right_by.push_back(_ensure_field_ref(key))
+
+        c_right_keys.on_key = _ensure_field_ref(right_on)
+        c_right_keys.by_key = c_right_by
+
+        c_input_keys.push_back(c_right_keys)
+
+        self.wrapped.reset(
+            new CAsofJoinNodeOptions(
+                c_input_keys,
+                tolerance,
+            )
+        )
+
+
+class AsofJoinNodeOptions(_AsofJoinNodeOptions):
+    """
+    Make a node which implements 'as of join' operation.
+
+    This is the option class for the "asofjoin" node factory.
+
+    Parameters
+    ----------
+    left_on : str, Expression
+        The left key on which the join operation should be performed.
+        Can be a string column name or a field expression.
+    left_by: str, Expression or list[str]
+        The left keys on which the join operation should be performed.
+        Each key can be a string column name or a field expression,
+        or a list of such field references.
+    right_operand : Table or Dataset
+        The right operand for the join operation.

Review Comment:
   This can be left out here in the options docstring (that's not a parameter)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to