houqp commented on a change in pull request #873:
URL: https://github.com/apache/arrow-datafusion/pull/873#discussion_r738950355



##########
File path: python/src/dataframe.rs
##########
@@ -15,174 +15,94 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 
-use logical_plan::LogicalPlan;
-use pyo3::{prelude::*, types::PyTuple};
+use pyo3::prelude::*;
 use tokio::runtime::Runtime;
 
-use datafusion::execution::context::ExecutionContext as _ExecutionContext;
-use datafusion::logical_plan::{JoinType, LogicalPlanBuilder};
-use datafusion::physical_plan::collect;
-use datafusion::{execution::context::ExecutionContextState, logical_plan};
-
-use crate::{errors, to_py};
-use crate::{errors::DataFusionError, expression};
+use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::pyarrow::PyArrowConvert;
 use datafusion::arrow::util::pretty;
+use datafusion::dataframe::DataFrame;
+use datafusion::logical_plan::JoinType;
+
+use crate::{errors::DataFusionError, expression::PyExpr};
 
-/// A DataFrame is a representation of a logical plan and an API to compose 
statements.
+/// A PyDataFrame is a representation of a logical plan and an API to compose 
statements.
 /// Use it to build a plan and `.collect()` to execute the plan and collect 
the result.
 /// The actual execution of a plan runs natively on Rust and Arrow on a 
multi-threaded environment.
-#[pyclass]
-pub(crate) struct DataFrame {
-    ctx_state: Arc<Mutex<ExecutionContextState>>,
-    plan: LogicalPlan,
+#[pyclass(name = "DataFrame")]
+#[derive(Clone)]
+pub(crate) struct PyDataFrame {
+    df: Arc<dyn DataFrame>,
 }
 
-impl DataFrame {
-    /// creates a new DataFrame
-    pub fn new(ctx_state: Arc<Mutex<ExecutionContextState>>, plan: 
LogicalPlan) -> Self {
-        Self { ctx_state, plan }
+impl PyDataFrame {
+    /// creates a new PyDataFrame
+    pub fn new(df: Arc<dyn DataFrame>) -> Self {
+        Self { df }
     }
 }
 
 #[pymethods]
-impl DataFrame {
-    /// Select `expressions` from the existing DataFrame.
-    #[args(args = "*")]
-    fn select(&self, args: &PyTuple) -> PyResult<Self> {
-        let expressions = expression::from_tuple(args)?;
-        let builder = LogicalPlanBuilder::from(self.plan.clone());
-        let builder =
-            errors::wrap(builder.project(expressions.into_iter().map(|e| 
e.expr)))?;
-        let plan = errors::wrap(builder.build())?;
-
-        Ok(DataFrame {
-            ctx_state: self.ctx_state.clone(),
-            plan,
-        })
+impl PyDataFrame {
+    /// Returns the schema from the logical plan
+    fn schema(&self) -> Schema {
+        self.df.schema().into()
     }
 
-    /// Filter according to the `predicate` expression
-    fn filter(&self, predicate: expression::Expression) -> PyResult<Self> {
-        let builder = LogicalPlanBuilder::from(self.plan.clone());
-        let builder = errors::wrap(builder.filter(predicate.expr))?;
-        let plan = errors::wrap(builder.build())?;
+    #[args(args = "*")]
+    fn select(&self, args: Vec<PyExpr>) -> PyResult<Self> {
+        let expr = args.into_iter().map(|e| e.into()).collect();
+        let df = self.df.select(expr)?;
+        Ok(Self::new(df))
+    }
 
-        Ok(DataFrame {
-            ctx_state: self.ctx_state.clone(),
-            plan,
-        })
+    fn filter(&self, predicate: PyExpr) -> PyResult<Self> {
+        let df = self.df.filter(predicate.into())?;
+        Ok(Self::new(df))
     }
 
-    /// Aggregates using expressions
-    fn aggregate(
-        &self,
-        group_by: Vec<expression::Expression>,
-        aggs: Vec<expression::Expression>,
-    ) -> PyResult<Self> {
-        let builder = LogicalPlanBuilder::from(self.plan.clone());
-        let builder = errors::wrap(builder.aggregate(
-            group_by.into_iter().map(|e| e.expr),
-            aggs.into_iter().map(|e| e.expr),
-        ))?;
-        let plan = errors::wrap(builder.build())?;
-
-        Ok(DataFrame {
-            ctx_state: self.ctx_state.clone(),
-            plan,
-        })
+    fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> 
PyResult<Self> {
+        let group_by = group_by.into_iter().map(|e| e.into()).collect();
+        let aggs = aggs.into_iter().map(|e| e.into()).collect();
+        let df = self.df.aggregate(group_by, aggs)?;
+        Ok(Self::new(df))
     }
 
-    /// Sort by specified sorting expressions
-    fn sort(&self, exprs: Vec<expression::Expression>) -> PyResult<Self> {
-        let exprs = exprs.into_iter().map(|e| e.expr);
-        let builder = LogicalPlanBuilder::from(self.plan.clone());
-        let builder = errors::wrap(builder.sort(exprs))?;
-        let plan = errors::wrap(builder.build())?;
-        Ok(DataFrame {
-            ctx_state: self.ctx_state.clone(),
-            plan,
-        })
+    #[args(exprs = "*")]
+    fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
+        let exprs = exprs.into_iter().map(|e| e.into()).collect();
+        let df = self.df.sort(exprs)?;
+        Ok(Self::new(df))
     }
 
-    /// Limits the plan to return at most `count` rows
     fn limit(&self, count: usize) -> PyResult<Self> {
-        let builder = LogicalPlanBuilder::from(self.plan.clone());
-        let builder = errors::wrap(builder.limit(count))?;
-        let plan = errors::wrap(builder.build())?;
-
-        Ok(DataFrame {
-            ctx_state: self.ctx_state.clone(),
-            plan,
-        })
+        let df = self.df.limit(count)?;
+        Ok(Self::new(df))
     }
 
     /// Executes the plan, returning a list of `RecordBatch`es.
-    /// Unless some order is specified in the plan, there is no guarantee of 
the order of the result
-    fn collect(&self, py: Python) -> PyResult<PyObject> {
-        let ctx = _ExecutionContext::from(self.ctx_state.clone());
+    /// Unless some order is specified in the plan, there is no
+    /// guarantee of the order of the result.
+    fn collect(&self, py: Python) -> PyResult<Vec<PyObject>> {
         let rt = Runtime::new().unwrap();
-        let plan = ctx
-            .optimize(&self.plan)
-            .map_err(|e| -> errors::DataFusionError { e.into() })?;
-
-        let plan = py.allow_threads(|| {
-            rt.block_on(async {
-                ctx.create_physical_plan(&plan)
-                    .await
-                    .map_err(|e| -> errors::DataFusionError { e.into() })
-            })
-        })?;
-
-        let batches = py.allow_threads(|| {
-            rt.block_on(async {
-                collect(plan)
-                    .await
-                    .map_err(|e| -> errors::DataFusionError { e.into() })
-            })
-        })?;
-        to_py::to_py(&batches)
+        let batches = py.allow_threads(|| rt.block_on(self.df.collect()))?;
+        // cannot use PyResult<Vec<RecordBatch>> return type due to
+        // https://github.com/PyO3/pyo3/issues/1813
+        batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
     }
 
     /// Print the result, 20 lines by default
     #[args(num = "20")]
     fn show(&self, py: Python, num: usize) -> PyResult<()> {
-        let ctx = _ExecutionContext::from(self.ctx_state.clone());
         let rt = Runtime::new().unwrap();
-        let plan = py.allow_threads(|| {
-            rt.block_on(async {
-                let l_plan = ctx
-                    .optimize(&self.limit(num)?.plan)
-                    .map_err(|e| -> errors::DataFusionError { e.into() })?;
-                let p_plan = ctx
-                    .create_physical_plan(&l_plan)
-                    .await
-                    .map_err(|e| -> errors::DataFusionError { e.into() })?;
-                Ok::<_, PyErr>(p_plan)
-            })
-        })?;
-
-        let batches = py.allow_threads(|| {
-            rt.block_on(async {
-                collect(plan)
-                    .await
-                    .map_err(|e| -> errors::DataFusionError { e.into() })
-            })
-        })?;
-
-        Ok(pretty::print_batches(&batches).unwrap())
+        let df = self.df.limit(num)?;
+        let batches = py.allow_threads(|| rt.block_on(df.collect()))?;
+        Ok(pretty::print_batches(&batches)?)
     }
 
-    /// Returns the join of two DataFrames `on`.
-    fn join(
-        &self,
-        right: &DataFrame,
-        join_keys: (Vec<&str>, Vec<&str>),
-        how: &str,
-    ) -> PyResult<Self> {
-        let builder = LogicalPlanBuilder::from(self.plan.clone());
-
+    fn join(&self, right: PyDataFrame, on: Vec<&str>, how: &str) -> 
PyResult<Self> {

Review comment:
       `Vec<&str>` was the original one, datafusion later added support for 
joining on different column keys so it got split into two arguments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to