This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4869a86 Empty relation bindings (#208)
4869a86 is described below
commit 4869a86729c79cfcbcfb143d1e56bb7620c6f738
Author: Jeremy Dyer <[email protected]>
AuthorDate: Wed Feb 22 18:50:18 2023 -0500
Empty relation bindings (#208)
---
datafusion/cudf.py | 1 -
datafusion/tests/test_expr.py | 6 +--
src/common/df_schema.rs | 5 +++
src/expr.rs | 2 +
src/expr/aggregate.rs | 6 +--
src/expr/analyze.rs | 6 ++-
src/expr/{analyze.rs => empty_relation.rs} | 62 ++++++++++++++----------------
src/expr/filter.rs | 6 +--
src/expr/limit.rs | 6 +--
src/expr/logical_node.rs | 2 +-
src/expr/projection.rs | 10 +++--
src/expr/sort.rs | 6 +--
src/expr/table_scan.rs | 2 +-
src/sql/logical.rs | 2 +
14 files changed, 67 insertions(+), 55 deletions(-)
diff --git a/datafusion/cudf.py b/datafusion/cudf.py
index c38819c..d5f0215 100644
--- a/datafusion/cudf.py
+++ b/datafusion/cudf.py
@@ -30,7 +30,6 @@ class SessionContext:
self.datafusion_ctx.register_parquet(name, path)
def to_cudf_expr(self, expr):
-
# get Python wrapper for logical expression
expr = expr.to_variant()
diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py
index 143eea6..0c4869f 100644
--- a/datafusion/tests/test_expr.py
+++ b/datafusion/tests/test_expr.py
@@ -60,7 +60,7 @@ def test_projection(test_ctx):
assert col3.op() == "<"
assert isinstance(col3.right().to_variant(), Literal)
- plan = plan.input().to_variant()
+ plan = plan.input()[0].to_variant()
assert isinstance(plan, TableScan)
@@ -71,7 +71,7 @@ def test_filter(test_ctx):
plan = plan.to_variant()
assert isinstance(plan, Projection)
- plan = plan.input().to_variant()
+ plan = plan.input()[0].to_variant()
assert isinstance(plan, Filter)
@@ -90,7 +90,7 @@ def test_aggregate_query(test_ctx):
projection = plan.to_variant()
assert isinstance(projection, Projection)
- aggregate = projection.input().to_variant()
+ aggregate = projection.input()[0].to_variant()
assert isinstance(aggregate, Aggregate)
col1 = aggregate.group_by_exprs()[0].to_variant()
diff --git a/src/common/df_schema.rs b/src/common/df_schema.rs
index c46a771..c16b8eb 100644
--- a/src/common/df_schema.rs
+++ b/src/common/df_schema.rs
@@ -49,4 +49,9 @@ impl PyDFSchema {
schema: Arc::new(DFSchema::empty()),
})
}
+
+ #[pyo3(name = "field_names")]
+ fn py_field_names(&self) -> PyResult<Vec<String>> {
+ Ok(self.schema.field_names())
+ }
}
diff --git a/src/expr.rs b/src/expr.rs
index 90ce6bf..5cd3aa5 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -34,6 +34,7 @@ pub mod aggregate_expr;
pub mod analyze;
pub mod binary_expr;
pub mod column;
+pub mod empty_relation;
pub mod filter;
pub mod limit;
pub mod literal;
@@ -185,5 +186,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<aggregate::PyAggregate>()?;
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
+ m.add_class::<empty_relation::PyEmptyRelation>()?;
Ok(())
}
diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs
index 98d1f55..0449d16 100644
--- a/src/expr/aggregate.rs
+++ b/src/expr/aggregate.rs
@@ -85,8 +85,8 @@ impl PyAggregate {
}
// Retrieves the input `LogicalPlan` to this `Aggregate` node
- fn input(&self) -> PyLogicalPlan {
- PyLogicalPlan::from((*self.aggregate.input).clone())
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
}
// Resulting Schema for this `Aggregate` node instance
@@ -100,7 +100,7 @@ impl PyAggregate {
}
impl LogicalNode for PyAggregate {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
}
}
diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs
index 095fab0..5e195f7 100644
--- a/src/expr/analyze.rs
+++ b/src/expr/analyze.rs
@@ -59,6 +59,10 @@ impl PyAnalyze {
Ok(self.analyze.verbose)
}
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
+ }
+
/// Resulting Schema for this `Analyze` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok((*self.analyze.schema).clone().into())
@@ -70,7 +74,7 @@ impl PyAnalyze {
}
impl LogicalNode for PyAnalyze {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.analyze.input).clone())]
}
}
diff --git a/src/expr/analyze.rs b/src/expr/empty_relation.rs
similarity index 50%
copy from src/expr/analyze.rs
copy to src/expr/empty_relation.rs
index 095fab0..f3008d1 100644
--- a/src/expr/analyze.rs
+++ b/src/expr/empty_relation.rs
@@ -15,62 +15,58 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_expr::logical_plan::Analyze;
+use crate::common::df_schema::PyDFSchema;
+use datafusion_expr::EmptyRelation;
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};
-use crate::common::df_schema::PyDFSchema;
-use crate::expr::logical_node::LogicalNode;
-use crate::sql::logical::PyLogicalPlan;
-
-#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)]
+#[pyclass(name = "EmptyRelation", module = "datafusion.expr", subclass)]
#[derive(Clone)]
-pub struct PyAnalyze {
- analyze: Analyze,
+pub struct PyEmptyRelation {
+ empty: EmptyRelation,
}
-impl PyAnalyze {
- pub fn new(analyze: Analyze) -> Self {
- Self { analyze }
+impl From<PyEmptyRelation> for EmptyRelation {
+ fn from(empty_relation: PyEmptyRelation) -> Self {
+ empty_relation.empty
}
}
-impl From<Analyze> for PyAnalyze {
- fn from(analyze: Analyze) -> PyAnalyze {
- PyAnalyze { analyze }
+impl From<EmptyRelation> for PyEmptyRelation {
+ fn from(empty: EmptyRelation) -> PyEmptyRelation {
+ PyEmptyRelation { empty }
}
}
-impl From<PyAnalyze> for Analyze {
- fn from(analyze: PyAnalyze) -> Self {
- analyze.analyze
- }
-}
-
-impl Display for PyAnalyze {
+impl Display for PyEmptyRelation {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "Analyze Table")
+ write!(
+ f,
+ "Empty Relation
+ \nProduce One Row: {:?}
+ \nSchema: {:?}",
+ &self.empty.produce_one_row, &self.empty.schema
+ )
}
}
#[pymethods]
-impl PyAnalyze {
- fn verbose(&self) -> PyResult<bool> {
- Ok(self.analyze.verbose)
+impl PyEmptyRelation {
+ fn produce_one_row(&self) -> PyResult<bool> {
+ Ok(self.empty.produce_one_row)
}
- /// Resulting Schema for this `Analyze` node instance
+ /// Resulting Schema for this `EmptyRelation` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
- Ok((*self.analyze.schema).clone().into())
+ Ok((*self.empty.schema).clone().into())
}
- fn __repr__(&self) -> PyResult<String> {
- Ok(format!("Analyze({})", self))
+ /// Get a String representation of this column
+ fn __repr__(&self) -> String {
+ format!("{}", self)
}
-}
-impl LogicalNode for PyAnalyze {
- fn input(&self) -> Vec<PyLogicalPlan> {
- vec![PyLogicalPlan::from((*self.analyze.input).clone())]
+ fn __name__(&self) -> PyResult<String> {
+ Ok("EmptyRelation".to_string())
}
}
diff --git a/src/expr/filter.rs b/src/expr/filter.rs
index b7b48b9..0994b4e 100644
--- a/src/expr/filter.rs
+++ b/src/expr/filter.rs
@@ -62,8 +62,8 @@ impl PyFilter {
}
/// Retrieves the input `LogicalPlan` to this `Filter` node
- fn input(&self) -> PyLogicalPlan {
- PyLogicalPlan::from((*self.filter.input).clone())
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
}
/// Resulting Schema for this `Filter` node instance
@@ -77,7 +77,7 @@ impl PyFilter {
}
impl LogicalNode for PyFilter {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.filter.input).clone())]
}
}
diff --git a/src/expr/limit.rs b/src/expr/limit.rs
index a50e5b8..2366be6 100644
--- a/src/expr/limit.rs
+++ b/src/expr/limit.rs
@@ -67,8 +67,8 @@ impl PyLimit {
}
/// Retrieves the input `LogicalPlan` to this `Limit` node
- fn input(&self) -> PyLogicalPlan {
- PyLogicalPlan::from((*self.limit.input).clone())
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
}
/// Resulting Schema for this `Limit` node instance
@@ -82,7 +82,7 @@ impl PyLimit {
}
impl LogicalNode for PyLimit {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.limit.input).clone())]
}
}
diff --git a/src/expr/logical_node.rs b/src/expr/logical_node.rs
index 1bb3fa7..7d4fe54 100644
--- a/src/expr/logical_node.rs
+++ b/src/expr/logical_node.rs
@@ -21,5 +21,5 @@ use crate::sql::logical::PyLogicalPlan;
/// any "node" shares these common traits in common.
pub trait LogicalNode {
/// The input plan to the current logical node instance.
- fn input(&self) -> Vec<PyLogicalPlan>;
+ fn inputs(&self) -> Vec<PyLogicalPlan>;
}
diff --git a/src/expr/projection.rs b/src/expr/projection.rs
index 4c158f7..2551803 100644
--- a/src/expr/projection.rs
+++ b/src/expr/projection.rs
@@ -74,8 +74,8 @@ impl PyProjection {
}
/// Retrieves the input `LogicalPlan` to this `Projection` node
- fn input(&self) -> PyLogicalPlan {
- PyLogicalPlan::from((*self.projection.input).clone())
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
}
/// Resulting Schema for this `Projection` node instance
@@ -86,10 +86,14 @@ impl PyProjection {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("Projection({})", self))
}
+
+ fn __name__(&self) -> PyResult<String> {
+ Ok("Projection".to_string())
+ }
}
impl LogicalNode for PyProjection {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.projection.input).clone())]
}
}
diff --git a/src/expr/sort.rs b/src/expr/sort.rs
index 1d0a7f6..5037b6d 100644
--- a/src/expr/sort.rs
+++ b/src/expr/sort.rs
@@ -73,8 +73,8 @@ impl PySort {
}
/// Retrieves the input `LogicalPlan` to this `Sort` node
- fn input(&self) -> PyLogicalPlan {
- PyLogicalPlan::from((*self.sort.input).clone())
+ fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
+ Ok(Self::inputs(self))
}
/// Resulting Schema for this `Sort` node instance
@@ -88,7 +88,7 @@ impl PySort {
}
impl LogicalNode for PySort {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
vec![PyLogicalPlan::from((*self.sort.input).clone())]
}
}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
index 2784523..bd9e7db 100644
--- a/src/expr/table_scan.rs
+++ b/src/expr/table_scan.rs
@@ -127,7 +127,7 @@ impl PyTableScan {
}
impl LogicalNode for PyTableScan {
- fn input(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
// table scans are leaf nodes and do not have inputs
vec![]
}
diff --git a/src/sql/logical.rs b/src/sql/logical.rs
index ee48f1e..5a21ca8 100644
--- a/src/sql/logical.rs
+++ b/src/sql/logical.rs
@@ -20,6 +20,7 @@ use std::sync::Arc;
use crate::errors::py_runtime_err;
use crate::expr::aggregate::PyAggregate;
use crate::expr::analyze::PyAnalyze;
+use crate::expr::empty_relation::PyEmptyRelation;
use crate::expr::filter::PyFilter;
use crate::expr::limit::PyLimit;
use crate::expr::projection::PyProjection;
@@ -54,6 +55,7 @@ impl PyLogicalPlan {
Python::with_gil(|_| match self.plan.as_ref() {
LogicalPlan::Aggregate(plan) =>
Ok(PyAggregate::from(plan.clone()).into_py(py)),
LogicalPlan::Analyze(plan) =>
Ok(PyAnalyze::from(plan.clone()).into_py(py)),
+ LogicalPlan::EmptyRelation(plan) =>
Ok(PyEmptyRelation::from(plan.clone()).into_py(py)),
LogicalPlan::Filter(plan) =>
Ok(PyFilter::from(plan.clone()).into_py(py)),
LogicalPlan::Limit(plan) =>
Ok(PyLimit::from(plan.clone()).into_py(py)),
LogicalPlan::Projection(plan) =>
Ok(PyProjection::from(plan.clone()).into_py(py)),