This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd3d50  ARROW-9759: [Rust] [DataFusion] Implement DataFrame.sort()
9bd3d50 is described below

commit 9bd3d505f8d571571ca674499646b545d7a64546
Author: Andy Grove <[email protected]>
AuthorDate: Sun Aug 16 15:32:43 2020 -0600

    ARROW-9759: [Rust] [DataFusion] Implement DataFrame.sort()
    
    - Implements `DataFrame.sort`
    - Improves Rustdoc code examples to use `?` instead of `unwrap()`
    - Implements `Expr.and` and `Expr.or` since I noticed those were missing
    
    Closes #7976 from andygrove/dataframe-sort
    
    Authored-by: Andy Grove <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/src/dataframe.rs                | 141 +++++++++++++++---------
 rust/datafusion/src/execution/dataframe_impl.rs |   6 +
 rust/datafusion/src/logicalplan.rs              |  24 ++++
 3 files changed, 121 insertions(+), 50 deletions(-)

diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs
index 08d455e..75618d1 100644
--- a/rust/datafusion/src/dataframe.rs
+++ b/rust/datafusion/src/dataframe.rs
@@ -35,72 +35,87 @@ use std::sync::Arc;
 /// The query can be executed by calling the `collect` method.
 ///
 /// ```
-/// use datafusion::ExecutionContext;
-/// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-/// use datafusion::logicalplan::col;
-///
+/// # use datafusion::ExecutionContext;
+/// # use datafusion::error::Result;
+/// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+/// # use datafusion::logicalplan::col;
+/// # fn main() -> Result<()> {
 /// let mut ctx = ExecutionContext::new();
-/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
-/// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
-///            .aggregate(vec![col("a")], 
vec![df.min(col("b")).unwrap()]).unwrap()
-///            .limit(100).unwrap();
+/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+/// let df = df.filter(col("a").lt_eq(col("b")))?
+///            .aggregate(vec![col("a")], vec![df.min(col("b"))?])?
+///            .limit(100)?;
 /// let results = df.collect();
+/// # Ok(())
+/// # }
 /// ```
 pub trait DataFrame {
     /// Filter the DataFrame by column. Returns a new DataFrame only 
containing the
     /// specified columns.
     ///
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    ///
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
-    /// let df = df.select_columns(vec!["a", "b"]).unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let df = df.select_columns(vec!["a", "b"])?;
+    /// # Ok(())
+    /// # }
     /// ```
     fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn DataFrame>>;
 
     /// Create a projection based on arbitrary expressions.
     ///
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-    /// use datafusion::logicalplan::col;
-    ///
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
-    /// let df = df.select(vec![col("a").multiply(col("b")), 
col("c")]).unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let df = df.select(vec![col("a").multiply(col("b")), col("c")])?;
+    /// # Ok(())
+    /// # }
     /// ```
     fn select(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;
 
     /// Filter a DataFrame to only include rows that match the specified 
filter expression.
     ///
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-    /// use datafusion::logicalplan::col;
-    ///
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
-    /// let df = df.filter(col("a").lt_eq(col("b"))).unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let df = df.filter(col("a").lt_eq(col("b")))?;
+    /// # Ok(())
+    /// # }
     /// ```
     fn filter(&self, expr: Expr) -> Result<Arc<dyn DataFrame>>;
 
     /// Perform an aggregate query with optional grouping expressions.
     ///
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-    /// use datafusion::logicalplan::col;
-    ///
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
     ///
     /// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
-    /// let _ = df.aggregate(vec![col("a")], 
vec![df.min(col("b")).unwrap()]).unwrap();
+    /// let _ = df.aggregate(vec![col("a")], vec![df.min(col("b"))?])?;
     ///
     /// // The following use is the equivalent of "SELECT MIN(b)"
-    /// let _ = df.aggregate(vec![], vec![df.min(col("b")).unwrap()]).unwrap();
+    /// let _ = df.aggregate(vec![], vec![df.min(col("b"))?])?;
+    /// # Ok(())
+    /// # }
     /// ```
     fn aggregate(
         &self,
@@ -108,29 +123,52 @@ pub trait DataFrame {
         aggr_expr: Vec<Expr>,
     ) -> Result<Arc<dyn DataFrame>>;
 
-    /// limit the number of rows returned from this DataFrame.
+    /// Limit the number of rows returned from this DataFrame.
     ///
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-    /// use datafusion::logicalplan::col;
-    ///
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
-    /// let df = df.limit(100).unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let df = df.limit(100)?;
+    /// # Ok(())
+    /// # }
     /// ```
     fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>>;
 
-    /// Executes this DataFrame and collects all results into a vector of 
RecordBatch.
+    /// Sort the DataFrame by the specified sorting expressions. Any 
expression can be turned into
+    /// a sort expression by calling its 
[sort](../logicalplan/enum.Expr.html#method.sort) method.
     ///
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-    /// use datafusion::logicalplan::col;
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, 
false)])?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;
+
+    /// Executes this DataFrame and collects all results into a vector of 
RecordBatch.
     ///
+    /// ```
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
-    /// let batches = df.collect().unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let batches = df.collect()?;
+    /// # Ok(())
+    /// # }
     /// ```
     fn collect(&self) -> Result<Vec<RecordBatch>>;
 
@@ -138,13 +176,16 @@ pub trait DataFrame {
     /// where each column has a name, data type, and nullability attribute.
 
     /// ```
-    /// use datafusion::ExecutionContext;
-    /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-    /// use datafusion::logicalplan::col;
-    ///
+    /// # use datafusion::ExecutionContext;
+    /// # use datafusion::error::Result;
+    /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+    /// # use datafusion::logicalplan::col;
+    /// # fn main() -> Result<()> {
     /// let mut ctx = ExecutionContext::new();
-    /// let df = ctx.read_csv("tests/example.csv", 
CsvReadOptions::new()).unwrap();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
     /// let schema = df.schema();
+    /// # Ok(())
+    /// # }
     /// ```
     fn schema(&self) -> &Schema;
 
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs 
b/rust/datafusion/src/execution/dataframe_impl.rs
index 491cb70..1df3be4 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -92,6 +92,12 @@ impl DataFrame for DataFrameImpl {
         Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
     }
 
+    /// Sort by specified sorting expressions
+    fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
+        let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
+        Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
+    }
+
     /// Create an expression to represent the min() aggregate function
     fn min(&self, expr: Expr) -> Result<Expr> {
         self.aggregate_expr("MIN", expr)
diff --git a/rust/datafusion/src/logicalplan.rs 
b/rust/datafusion/src/logicalplan.rs
index ee3c17a..dc8542b 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -465,6 +465,16 @@ impl Expr {
         binary_expr(self.clone(), Operator::LtEq, other.clone())
     }
 
+    /// And
+    pub fn and(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::And, other)
+    }
+
+    /// Or
+    pub fn or(&self, other: Expr) -> Expr {
+        binary_expr(self.clone(), Operator::Or, other)
+    }
+
     /// Not
     pub fn not(&self) -> Expr {
         Expr::Not(Box::new(self.clone()))
@@ -499,6 +509,20 @@ impl Expr {
     pub fn alias(&self, name: &str) -> Expr {
         Expr::Alias(Box::new(self.clone()), name.to_owned())
     }
+
+    /// Create a sort expression from an existing expression.
+    ///
+    /// ```
+    /// # use datafusion::logicalplan::col;
+    /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
+    /// ```
+    pub fn sort(&self, asc: bool, nulls_first: bool) -> Expr {
+        Expr::Sort {
+            expr: Box::new(self.clone()),
+            asc,
+            nulls_first,
+        }
+    }
 }
 
 fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {

Reply via email to