This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6cd74a9 add LogicalPlan Join and CrossJoin (#246)
6cd74a9 is described below
commit 6cd74a9ea97615db1bbd76ce7efd2fe20823c675
Author: Ian Alexander Joiner <[email protected]>
AuthorDate: Tue Feb 28 21:11:29 2023 -0500
add LogicalPlan Join and CrossJoin (#246)
---
datafusion/tests/test_imports.py | 8 ++
src/expr.rs | 6 ++
src/expr/cross_join.rs | 90 +++++++++++++++++++
src/expr/join.rs | 181 +++++++++++++++++++++++++++++++++++++++
src/expr/signature.rs | 1 +
5 files changed, 286 insertions(+)
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index 8792fac..77309e2 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -44,6 +44,10 @@ from datafusion.expr import (
Aggregate,
Sort,
Analyze,
+ Join,
+ JoinType,
+ JoinConstraint,
+ CrossJoin,
Union,
Like,
ILike,
@@ -106,6 +110,10 @@ def test_class_module_is_datafusion():
Limit,
Filter,
Analyze,
+ Join,
+ JoinType,
+ JoinConstraint,
+ CrossJoin,
Union,
Like,
ILike,
diff --git a/src/expr.rs b/src/expr.rs
index c4a9f00..cf1bc5d 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -47,6 +47,7 @@ pub mod bool_expr;
pub mod case;
pub mod cast;
pub mod column;
+pub mod cross_join;
pub mod empty_relation;
pub mod exists;
pub mod filter;
@@ -54,6 +55,7 @@ pub mod grouping_set;
pub mod in_list;
pub mod in_subquery;
pub mod indexed_field;
+pub mod join;
pub mod like;
pub mod limit;
pub mod literal;
@@ -267,6 +269,10 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
m.add_class::<empty_relation::PyEmptyRelation>()?;
+ m.add_class::<join::PyJoin>()?;
+ m.add_class::<join::PyJoinType>()?;
+ m.add_class::<join::PyJoinConstraint>()?;
+ m.add_class::<cross_join::PyCrossJoin>()?;
m.add_class::<union::PyUnion>()?;
Ok(())
}
diff --git a/src/expr/cross_join.rs b/src/expr/cross_join.rs
new file mode 100644
index 0000000..4f5952c
--- /dev/null
+++ b/src/expr/cross_join.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::CrossJoin;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "CrossJoin", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyCrossJoin {
+ cross_join: CrossJoin,
+}
+
+impl From<CrossJoin> for PyCrossJoin {
+ fn from(cross_join: CrossJoin) -> PyCrossJoin {
+ PyCrossJoin { cross_join }
+ }
+}
+
+impl From<PyCrossJoin> for CrossJoin {
+ fn from(cross_join: PyCrossJoin) -> Self {
+ cross_join.cross_join
+ }
+}
+
+impl Display for PyCrossJoin {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "CrossJoin
+ \nLeft: {:?}
+ \nRight: {:?}
+ \nSchema: {:?}",
+ &self.cross_join.left, &self.cross_join.right,
&self.cross_join.schema
+ )
+ }
+}
+
+#[pymethods]
+impl PyCrossJoin {
+ /// Retrieves the left input `LogicalPlan` to this `CrossJoin` node
+ fn left(&self) -> PyResult<PyLogicalPlan> {
+ Ok(self.cross_join.left.as_ref().clone().into())
+ }
+
+ /// Retrieves the right input `LogicalPlan` to this `CrossJoin` node
+ fn right(&self) -> PyResult<PyLogicalPlan> {
+ Ok(self.cross_join.right.as_ref().clone().into())
+ }
+
+ /// Resulting Schema for this `CrossJoin` node instance
+ fn schema(&self) -> PyResult<PyDFSchema> {
+ Ok(self.cross_join.schema.as_ref().clone().into())
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("CrossJoin({})", self))
+ }
+
+ fn __name__(&self) -> PyResult<String> {
+ Ok("CrossJoin".to_string())
+ }
+}
+
+impl LogicalNode for PyCrossJoin {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
+ vec![
+ PyLogicalPlan::from((*self.cross_join.left).clone()),
+ PyLogicalPlan::from((*self.cross_join.right).clone()),
+ ]
+ }
+}
diff --git a/src/expr/join.rs b/src/expr/join.rs
new file mode 100644
index 0000000..428fb67
--- /dev/null
+++ b/src/expr/join.rs
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::{Join, JoinConstraint, JoinType};
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::{logical_node::LogicalNode, PyExpr};
+use crate::sql::logical::PyLogicalPlan;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+#[pyclass(name = "JoinType", module = "datafusion.expr")]
+pub struct PyJoinType {
+ join_type: JoinType,
+}
+
+impl From<JoinType> for PyJoinType {
+ fn from(join_type: JoinType) -> PyJoinType {
+ PyJoinType { join_type }
+ }
+}
+
+impl From<PyJoinType> for JoinType {
+ fn from(join_type: PyJoinType) -> Self {
+ join_type.join_type
+ }
+}
+
+#[pymethods]
+impl PyJoinType {
+ pub fn is_outer(&self) -> bool {
+ self.join_type.is_outer()
+ }
+}
+
+impl Display for PyJoinType {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(f, "{}", self.join_type)
+ }
+}
+
+#[derive(Debug, Clone, Copy)]
+#[pyclass(name = "JoinConstraint", module = "datafusion.expr")]
+pub struct PyJoinConstraint {
+ join_constraint: JoinConstraint,
+}
+
+impl From<JoinConstraint> for PyJoinConstraint {
+ fn from(join_constraint: JoinConstraint) -> PyJoinConstraint {
+ PyJoinConstraint { join_constraint }
+ }
+}
+
+impl From<PyJoinConstraint> for JoinConstraint {
+ fn from(join_constraint: PyJoinConstraint) -> Self {
+ join_constraint.join_constraint
+ }
+}
+
+#[pyclass(name = "Join", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyJoin {
+ join: Join,
+}
+
+impl From<Join> for PyJoin {
+ fn from(join: Join) -> PyJoin {
+ PyJoin { join }
+ }
+}
+
+impl From<PyJoin> for Join {
+ fn from(join: PyJoin) -> Self {
+ join.join
+ }
+}
+
+impl Display for PyJoin {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Join
+ \nLeft: {:?}
+ \nRight: {:?}
+ \nOn: {:?}
+ \nFilter: {:?}
+ \nJoinType: {:?}
+ \nJoinConstraint: {:?}
+ \nSchema: {:?}
+ \nNullEqualsNull: {:?}",
+ &self.join.left,
+ &self.join.right,
+ &self.join.on,
+ &self.join.filter,
+ &self.join.join_type,
+ &self.join.join_constraint,
+ &self.join.schema,
+ &self.join.null_equals_null,
+ )
+ }
+}
+
+#[pymethods]
+impl PyJoin {
+ /// Retrieves the left input `LogicalPlan` to this `Join` node
+ fn left(&self) -> PyResult<PyLogicalPlan> {
+ Ok(self.join.left.as_ref().clone().into())
+ }
+
+ /// Retrieves the right input `LogicalPlan` to this `Join` node
+ fn right(&self) -> PyResult<PyLogicalPlan> {
+ Ok(self.join.right.as_ref().clone().into())
+ }
+
+ /// Retrieves the right input `LogicalPlan` to this `Join` node
+ fn on(&self) -> PyResult<Vec<(PyExpr, PyExpr)>> {
+ Ok(self
+ .join
+ .on
+ .iter()
+ .map(|(l, r)| (PyExpr::from(l.clone()), PyExpr::from(r.clone())))
+ .collect())
+ }
+
+ /// Retrieves the filter `Option<PyExpr>` of this `Join` node
+ fn filter(&self) -> PyResult<Option<PyExpr>> {
+ Ok(self.join.filter.clone().map(Into::into))
+ }
+
+ /// Retrieves the `JoinType` to this `Join` node
+ fn join_type(&self) -> PyResult<PyJoinType> {
+ Ok(self.join.join_type.into())
+ }
+
+ /// Retrieves the `JoinConstraint` to this `Join` node
+ fn join_constraint(&self) -> PyResult<PyJoinConstraint> {
+ Ok(self.join.join_constraint.into())
+ }
+
+ /// Resulting Schema for this `Join` node instance
+ fn schema(&self) -> PyResult<PyDFSchema> {
+ Ok(self.join.schema.as_ref().clone().into())
+ }
+
+ /// If null_equals_null is true, null == null else null != null
+ fn null_equals_null(&self) -> PyResult<bool> {
+ Ok(self.join.null_equals_null)
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Join({})", self))
+ }
+
+ fn __name__(&self) -> PyResult<String> {
+ Ok("Join".to_string())
+ }
+}
+
+impl LogicalNode for PyJoin {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
+ vec![
+ PyLogicalPlan::from((*self.join.left).clone()),
+ PyLogicalPlan::from((*self.join.right).clone()),
+ ]
+ }
+}
diff --git a/src/expr/signature.rs b/src/expr/signature.rs
index 2893bef..2f19498 100644
--- a/src/expr/signature.rs
+++ b/src/expr/signature.rs
@@ -18,6 +18,7 @@
use datafusion_expr::{TypeSignature, Volatility};
use pyo3::prelude::*;
+#[allow(dead_code)]
#[pyclass(name = "Signature", module = "datafusion.expr", subclass)]
#[allow(dead_code)]
#[derive(Clone)]