This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b4b03fe  feat: expose `join_on` (#914)
b4b03fe is described below

commit b4b03fe10fab72cc5606a193b58e1c9ae5031318
Author: Ion Koutsouris <[email protected]>
AuthorDate: Tue Oct 15 13:20:39 2024 +0200

    feat: expose `join_on` (#914)
    
    * feat: expose join_on method
    
    * test: improve join_on case
---
 python/datafusion/dataframe.py | 25 ++++++++++++++++++++++++-
 python/tests/test_dataframe.py | 36 ++++++++++++++++++++++++++++++++++++
 src/dataframe.rs               | 25 +++++++++++++++++++++++++
 3 files changed, 85 insertions(+), 1 deletion(-)

diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index a9e4d4d..60203ff 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -21,7 +21,7 @@ See :ref:`user_guide_concepts` in the online documentation 
for more information.
 
 from __future__ import annotations
 
-from typing import Any, List, TYPE_CHECKING
+from typing import Any, List, TYPE_CHECKING, Literal
 from datafusion.record_batch import RecordBatchStream
 from typing_extensions import deprecated
 from datafusion.plan import LogicalPlan, ExecutionPlan
@@ -304,6 +304,29 @@ class DataFrame:
         """
         return DataFrame(self.df.join(right.df, join_keys, how))
 
+    def join_on(
+        self,
+        right: DataFrame,
+        *on_exprs: Expr,
+        how: Literal["inner", "left", "right", "full", "semi", "anti"] = 
"inner",
+    ) -> DataFrame:
+        """Join two :py:class:`DataFrame`using the specified expressions.
+
+        On expressions are used to support in-equality predicates. Equality
+        predicates are correctly optimized
+
+        Args:
+            right: Other DataFrame to join with.
+            on_exprs: single or multiple (in)-equality predicates.
+            how: Type of join to perform. Supported types are "inner", "left",
+                "right", "full", "semi", "anti".
+
+        Returns:
+            DataFrame after join.
+        """
+        exprs = [expr.expr for expr in on_exprs]
+        return DataFrame(self.df.join_on(right.df, exprs, how))
+
     def explain(self, verbose: bool = False, analyze: bool = False) -> 
DataFrame:
         """Return a DataFrame with the explanation of its plan so far.
 
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index 88c642a..6330ede 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -270,6 +270,42 @@ def test_join():
     assert table.to_pydict() == expected
 
 
+def test_join_on():
+    ctx = SessionContext()
+
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
+        names=["a", "b"],
+    )
+    df = ctx.create_dataframe([[batch]], "l")
+
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([1, 2]), pa.array([-8, 10])],
+        names=["a", "c"],
+    )
+    df1 = ctx.create_dataframe([[batch]], "r")
+
+    df2 = df.join_on(df1, column("l.a").__eq__(column("r.a")), how="inner")
+    df2.show()
+    df2 = df2.sort(column("l.a"))
+    table = pa.Table.from_batches(df2.collect())
+
+    expected = {"a": [1, 2], "c": [-8, 10], "b": [4, 5]}
+    assert table.to_pydict() == expected
+
+    df3 = df.join_on(
+        df1,
+        column("l.a").__eq__(column("r.a")),
+        column("l.a").__lt__(column("r.c")),
+        how="inner",
+    )
+    df3.show()
+    df3 = df3.sort(column("l.a"))
+    table = pa.Table.from_batches(df3.collect())
+    expected = {"a": [2], "c": [10], "b": [5]}
+    assert table.to_pydict() == expected
+
+
 def test_distinct():
     ctx = SessionContext()
 
diff --git a/src/dataframe.rs b/src/dataframe.rs
index db24370..fa6c1d4 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -300,6 +300,31 @@ impl PyDataFrame {
         Ok(Self::new(df))
     }
 
+    fn join_on(&self, right: PyDataFrame, on_exprs: Vec<PyExpr>, how: &str) -> 
PyResult<Self> {
+        let join_type = match how {
+            "inner" => JoinType::Inner,
+            "left" => JoinType::Left,
+            "right" => JoinType::Right,
+            "full" => JoinType::Full,
+            "semi" => JoinType::LeftSemi,
+            "anti" => JoinType::LeftAnti,
+            how => {
+                return Err(DataFusionError::Common(format!(
+                    "The join type {how} does not exist or is not implemented"
+                ))
+                .into());
+            }
+        };
+        let exprs: Vec<Expr> = on_exprs.into_iter().map(|e| 
e.into()).collect();
+
+        let df = self
+            .df
+            .as_ref()
+            .clone()
+            .join_on(right.df.as_ref().clone(), join_type, exprs)?;
+        Ok(Self::new(df))
+    }
+
     /// Print the query plan
     #[pyo3(signature = (verbose=false, analyze=false))]
     fn explain(&self, py: Python, verbose: bool, analyze: bool) -> 
PyResult<()> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to