This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4004fbe Add Python wrapper for LogicalPlan::Union (#240)
4004fbe is described below
commit 4004fbe2e8052698b7bcf59c1b77b735dc19771d
Author: Ian Alexander Joiner <[email protected]>
AuthorDate: Tue Feb 28 12:50:34 2023 -0500
Add Python wrapper for LogicalPlan::Union (#240)
* add union
* Make clippy happier
---
datafusion/tests/test_imports.py | 2 +
src/context.rs | 2 +-
src/expr.rs | 2 +
src/expr/signature.rs | 1 +
src/expr/union.rs | 85 ++++++++++++++++++++++++++++++++++++++++
5 files changed, 91 insertions(+), 1 deletion(-)
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index 20e7abb..8792fac 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -44,6 +44,7 @@ from datafusion.expr import (
Aggregate,
Sort,
Analyze,
+ Union,
Like,
ILike,
SimilarTo,
@@ -105,6 +106,7 @@ def test_class_module_is_datafusion():
Limit,
Filter,
Analyze,
+ Union,
Like,
ILike,
SimilarTo,
diff --git a/src/context.rs b/src/context.rs
index 1acf5f2..4a19377 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -605,7 +605,7 @@ impl PySessionContext {
let plan = plan.plan.clone();
let fut:
JoinHandle<datafusion_common::Result<SendableRecordBatchStream>> =
rt.spawn(async move { plan.execute(part, ctx) });
- let stream = wait_for_future(py, fut).map_err(|e|
py_datafusion_err(e))?;
+ let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
Ok(PyRecordBatchStream::new(stream?))
}
}
diff --git a/src/expr.rs b/src/expr.rs
index a288b38..c4a9f00 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -67,6 +67,7 @@ pub mod signature;
pub mod sort;
pub mod subquery;
pub mod table_scan;
+pub mod union;
/// A PyExpr that can be used on a DataFrame
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
@@ -266,5 +267,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
m.add_class::<empty_relation::PyEmptyRelation>()?;
+ m.add_class::<union::PyUnion>()?;
Ok(())
}
diff --git a/src/expr/signature.rs b/src/expr/signature.rs
index c59c990..2893bef 100644
--- a/src/expr/signature.rs
+++ b/src/expr/signature.rs
@@ -19,6 +19,7 @@ use datafusion_expr::{TypeSignature, Volatility};
use pyo3::prelude::*;
#[pyclass(name = "Signature", module = "datafusion.expr", subclass)]
+#[allow(dead_code)]
#[derive(Clone)]
pub struct PySignature {
type_signature: TypeSignature,
diff --git a/src/expr/union.rs b/src/expr/union.rs
new file mode 100644
index 0000000..186fbed
--- /dev/null
+++ b/src/expr/union.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Union;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Union", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyUnion {
+ union_: Union,
+}
+
+impl From<Union> for PyUnion {
+ fn from(union_: Union) -> PyUnion {
+ PyUnion { union_ }
+ }
+}
+
+impl From<PyUnion> for Union {
+ fn from(union_: PyUnion) -> Self {
+ union_.union_
+ }
+}
+
+impl Display for PyUnion {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Union
+ \nInputs: {:?}
+ \nSchema: {:?}",
+ &self.union_.inputs, &self.union_.schema,
+ )
+ }
+}
+
+#[pymethods]
+impl PyUnion {
+ /// Retrieves the input `LogicalPlan` to this `Union` node
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
+ }
+
+ /// Resulting Schema for this `Union` node instance
+ fn schema(&self) -> PyResult<PyDFSchema> {
+ Ok(self.union_.schema.as_ref().clone().into())
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Union({})", self))
+ }
+
+ fn __name__(&self) -> PyResult<String> {
+ Ok("Union".to_string())
+ }
+}
+
+impl LogicalNode for PyUnion {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
+ self.union_
+ .inputs
+ .iter()
+ .map(|x| x.as_ref().clone().into())
+ .collect()
+ }
+}