This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new b4b03fe feat: expose `join_on` (#914)
b4b03fe is described below
commit b4b03fe10fab72cc5606a193b58e1c9ae5031318
Author: Ion Koutsouris <[email protected]>
AuthorDate: Tue Oct 15 13:20:39 2024 +0200
feat: expose `join_on` (#914)
* feat: expose join_on method
* test: improve join_on case
---
python/datafusion/dataframe.py | 25 ++++++++++++++++++++++++-
python/tests/test_dataframe.py | 36 ++++++++++++++++++++++++++++++++++++
src/dataframe.rs | 25 +++++++++++++++++++++++++
3 files changed, 85 insertions(+), 1 deletion(-)
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index a9e4d4d..60203ff 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -21,7 +21,7 @@ See :ref:`user_guide_concepts` in the online documentation
for more information.
from __future__ import annotations
-from typing import Any, List, TYPE_CHECKING
+from typing import Any, List, TYPE_CHECKING, Literal
from datafusion.record_batch import RecordBatchStream
from typing_extensions import deprecated
from datafusion.plan import LogicalPlan, ExecutionPlan
@@ -304,6 +304,29 @@ class DataFrame:
"""
return DataFrame(self.df.join(right.df, join_keys, how))
+ def join_on(
+ self,
+ right: DataFrame,
+ *on_exprs: Expr,
+ how: Literal["inner", "left", "right", "full", "semi", "anti"] =
"inner",
+ ) -> DataFrame:
+ """Join two :py:class:`DataFrame`using the specified expressions.
+
+ On expressions are used to support in-equality predicates. Equality
+ predicates are correctly optimized
+
+ Args:
+ right: Other DataFrame to join with.
+ on_exprs: single or multiple (in)-equality predicates.
+ how: Type of join to perform. Supported types are "inner", "left",
+ "right", "full", "semi", "anti".
+
+ Returns:
+ DataFrame after join.
+ """
+ exprs = [expr.expr for expr in on_exprs]
+ return DataFrame(self.df.join_on(right.df, exprs, how))
+
def explain(self, verbose: bool = False, analyze: bool = False) ->
DataFrame:
"""Return a DataFrame with the explanation of its plan so far.
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index 88c642a..6330ede 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -270,6 +270,42 @@ def test_join():
assert table.to_pydict() == expected
+def test_join_on():
+ ctx = SessionContext()
+
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
+ names=["a", "b"],
+ )
+ df = ctx.create_dataframe([[batch]], "l")
+
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2]), pa.array([-8, 10])],
+ names=["a", "c"],
+ )
+ df1 = ctx.create_dataframe([[batch]], "r")
+
+ df2 = df.join_on(df1, column("l.a").__eq__(column("r.a")), how="inner")
+ df2.show()
+ df2 = df2.sort(column("l.a"))
+ table = pa.Table.from_batches(df2.collect())
+
+ expected = {"a": [1, 2], "c": [-8, 10], "b": [4, 5]}
+ assert table.to_pydict() == expected
+
+ df3 = df.join_on(
+ df1,
+ column("l.a").__eq__(column("r.a")),
+ column("l.a").__lt__(column("r.c")),
+ how="inner",
+ )
+ df3.show()
+ df3 = df3.sort(column("l.a"))
+ table = pa.Table.from_batches(df3.collect())
+ expected = {"a": [2], "c": [10], "b": [5]}
+ assert table.to_pydict() == expected
+
+
def test_distinct():
ctx = SessionContext()
diff --git a/src/dataframe.rs b/src/dataframe.rs
index db24370..fa6c1d4 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -300,6 +300,31 @@ impl PyDataFrame {
Ok(Self::new(df))
}
+ fn join_on(&self, right: PyDataFrame, on_exprs: Vec<PyExpr>, how: &str) ->
PyResult<Self> {
+ let join_type = match how {
+ "inner" => JoinType::Inner,
+ "left" => JoinType::Left,
+ "right" => JoinType::Right,
+ "full" => JoinType::Full,
+ "semi" => JoinType::LeftSemi,
+ "anti" => JoinType::LeftAnti,
+ how => {
+ return Err(DataFusionError::Common(format!(
+ "The join type {how} does not exist or is not implemented"
+ ))
+ .into());
+ }
+ };
+ let exprs: Vec<Expr> = on_exprs.into_iter().map(|e|
e.into()).collect();
+
+ let df = self
+ .df
+ .as_ref()
+ .clone()
+ .join_on(right.df.as_ref().clone(), join_type, exprs)?;
+ Ok(Self::new(df))
+ }
+
/// Print the query plan
#[pyo3(signature = (verbose=false, analyze=false))]
fn explain(&self, py: Python, verbose: bool, analyze: bool) ->
PyResult<()> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]