This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd3d50 ARROW-9759: [Rust] [DataFusion] Implement DataFrame.sort()
9bd3d50 is described below
commit 9bd3d505f8d571571ca674499646b545d7a64546
Author: Andy Grove <[email protected]>
AuthorDate: Sun Aug 16 15:32:43 2020 -0600
ARROW-9759: [Rust] [DataFusion] Implement DataFrame.sort()
- Implements `DataFrame.sort`
- Improves Rustdoc code examples to use `?` instead of `unwrap()`
- Implements `Expr.and` and `Expr.or` since I noticed those were missing
Closes #7976 from andygrove/dataframe-sort
Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
rust/datafusion/src/dataframe.rs | 141 +++++++++++++++---------
rust/datafusion/src/execution/dataframe_impl.rs | 6 +
rust/datafusion/src/logicalplan.rs | 24 ++++
3 files changed, 121 insertions(+), 50 deletions(-)
diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs
index 08d455e..75618d1 100644
--- a/rust/datafusion/src/dataframe.rs
+++ b/rust/datafusion/src/dataframe.rs
@@ -35,72 +35,87 @@ use std::sync::Arc;
/// The query can be executed by calling the `collect` method.
///
/// ```
-/// use datafusion::ExecutionContext;
-/// use datafusion::execution::physical_plan::csv::CsvReadOptions;
-/// use datafusion::logicalplan::col;
-///
+/// # use datafusion::ExecutionContext;
+/// # use datafusion::error::Result;
+/// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+/// # use datafusion::logicalplan::col;
+/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
-/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).unwrap();
-/// let df = df.filter(col("a").lt_eq(col("b"))).unwrap()
-/// .aggregate(vec![col("a")],
vec![df.min(col("b")).unwrap()]).unwrap()
-/// .limit(100).unwrap();
+/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+/// let df = df.filter(col("a").lt_eq(col("b")))?
+/// .aggregate(vec![col("a")], vec![df.min(col("b"))?])?
+/// .limit(100)?;
/// let results = df.collect();
+/// # Ok(())
+/// # }
/// ```
pub trait DataFrame {
/// Filter the DataFrame by column. Returns a new DataFrame only
containing the
/// specified columns.
///
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- ///
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
- /// let df = df.select_columns(vec!["a", "b"]).unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let df = df.select_columns(vec!["a", "b"])?;
+ /// # Ok(())
+ /// # }
/// ```
fn select_columns(&self, columns: Vec<&str>) -> Result<Arc<dyn DataFrame>>;
/// Create a projection based on arbitrary expressions.
///
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
- /// use datafusion::logicalplan::col;
- ///
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
- /// let df = df.select(vec![col("a").multiply(col("b")),
col("c")]).unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let df = df.select(vec![col("a").multiply(col("b")), col("c")])?;
+ /// # Ok(())
+ /// # }
/// ```
fn select(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;
/// Filter a DataFrame to only include rows that match the specified
filter expression.
///
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
- /// use datafusion::logicalplan::col;
- ///
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
- /// let df = df.filter(col("a").lt_eq(col("b"))).unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let df = df.filter(col("a").lt_eq(col("b")))?;
+ /// # Ok(())
+ /// # }
/// ```
fn filter(&self, expr: Expr) -> Result<Arc<dyn DataFrame>>;
/// Perform an aggregate query with optional grouping expressions.
///
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
- /// use datafusion::logicalplan::col;
- ///
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
///
/// // The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
- /// let _ = df.aggregate(vec![col("a")],
vec![df.min(col("b")).unwrap()]).unwrap();
+ /// let _ = df.aggregate(vec![col("a")], vec![df.min(col("b"))?])?;
///
/// // The following use is the equivalent of "SELECT MIN(b)"
- /// let _ = df.aggregate(vec![], vec![df.min(col("b")).unwrap()]).unwrap();
+ /// let _ = df.aggregate(vec![], vec![df.min(col("b"))?])?;
+ /// # Ok(())
+ /// # }
/// ```
fn aggregate(
&self,
@@ -108,29 +123,52 @@ pub trait DataFrame {
aggr_expr: Vec<Expr>,
) -> Result<Arc<dyn DataFrame>>;
- /// limit the number of rows returned from this DataFrame.
+ /// Limit the number of rows returned from this DataFrame.
///
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
- /// use datafusion::logicalplan::col;
- ///
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
- /// let df = df.limit(100).unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let df = df.limit(100)?;
+ /// # Ok(())
+ /// # }
/// ```
fn limit(&self, n: usize) -> Result<Arc<dyn DataFrame>>;
- /// Executes this DataFrame and collects all results into a vector of
RecordBatch.
+ /// Sort the DataFrame by the specified sorting expressions. Any
expression can be turned into
+ /// a sort expression by calling its
[sort](../logicalplan/enum.Expr.html#method.sort) method.
///
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
- /// use datafusion::logicalplan::col;
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false,
false)])?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>>;
+
+ /// Executes this DataFrame and collects all results into a vector of
RecordBatch.
///
+ /// ```
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
- /// let batches = df.collect().unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let batches = df.collect()?;
+ /// # Ok(())
+ /// # }
/// ```
fn collect(&self) -> Result<Vec<RecordBatch>>;
@@ -138,13 +176,16 @@ pub trait DataFrame {
/// where each column has a name, data type, and nullability attribute.
/// ```
- /// use datafusion::ExecutionContext;
- /// use datafusion::execution::physical_plan::csv::CsvReadOptions;
- /// use datafusion::logicalplan::col;
- ///
+ /// # use datafusion::ExecutionContext;
+ /// # use datafusion::error::Result;
+ /// # use datafusion::execution::physical_plan::csv::CsvReadOptions;
+ /// # use datafusion::logicalplan::col;
+ /// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
- /// let df = ctx.read_csv("tests/example.csv",
CsvReadOptions::new()).unwrap();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let schema = df.schema();
+ /// # Ok(())
+ /// # }
/// ```
fn schema(&self) -> &Schema;
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs
b/rust/datafusion/src/execution/dataframe_impl.rs
index 491cb70..1df3be4 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -92,6 +92,12 @@ impl DataFrame for DataFrameImpl {
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}
+ /// Sort by specified sorting expressions
+ fn sort(&self, expr: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
+ let plan = LogicalPlanBuilder::from(&self.plan).sort(expr)?.build()?;
+ Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
+ }
+
/// Create an expression to represent the min() aggregate function
fn min(&self, expr: Expr) -> Result<Expr> {
self.aggregate_expr("MIN", expr)
diff --git a/rust/datafusion/src/logicalplan.rs
b/rust/datafusion/src/logicalplan.rs
index ee3c17a..dc8542b 100644
--- a/rust/datafusion/src/logicalplan.rs
+++ b/rust/datafusion/src/logicalplan.rs
@@ -465,6 +465,16 @@ impl Expr {
binary_expr(self.clone(), Operator::LtEq, other.clone())
}
+ /// And
+ pub fn and(&self, other: Expr) -> Expr {
+ binary_expr(self.clone(), Operator::And, other)
+ }
+
+ /// Or
+ pub fn or(&self, other: Expr) -> Expr {
+ binary_expr(self.clone(), Operator::Or, other)
+ }
+
/// Not
pub fn not(&self) -> Expr {
Expr::Not(Box::new(self.clone()))
@@ -499,6 +509,20 @@ impl Expr {
pub fn alias(&self, name: &str) -> Expr {
Expr::Alias(Box::new(self.clone()), name.to_owned())
}
+
+ /// Create a sort expression from an existing expression.
+ ///
+ /// ```
+ /// # use datafusion::logicalplan::col;
+ /// let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
+ /// ```
+ pub fn sort(&self, asc: bool, nulls_first: bool) -> Expr {
+ Expr::Sort {
+ expr: Box::new(self.clone()),
+ asc,
+ nulls_first,
+ }
+ }
}
fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {