This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bd90e4afa8 Support `REPLACE` SQL alias syntax (#7368)
bd90e4afa8 is described below
commit bd90e4afa8fa7e9e52aa515ae36abc2b9eb59caf
Author: Berkay Şahin <[email protected]>
AuthorDate: Wed Aug 23 21:17:05 2023 +0300
Support `REPLACE` SQL alias syntax (#7368)
* replace implementation and tests
* Minor code changes
* fix after merge
---------
Co-authored-by: metesynnada <[email protected]>
---
datafusion/expr/src/utils.rs | 21 +++---
datafusion/sql/src/select.rs | 84 +++++++++++++++++++++---
datafusion/sqllogictest/test_files/functions.slt | 39 +++++++++++
3 files changed, 128 insertions(+), 16 deletions(-)
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 88fd9fa6b7..aae0b3a1b0 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -318,15 +318,15 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut
HashSet<Column>) -> Result<()> {
/// Find excluded columns in the schema, if any
/// SELECT * EXCLUDE(col1, col2), would return `vec![col1, col2]`
fn get_excluded_columns(
- opt_exclude: Option<ExcludeSelectItem>,
- opt_except: Option<ExceptSelectItem>,
+ opt_exclude: Option<&ExcludeSelectItem>,
+ opt_except: Option<&ExceptSelectItem>,
schema: &DFSchema,
qualifier: &Option<TableReference>,
) -> Result<Vec<Column>> {
let mut idents = vec![];
if let Some(excepts) = opt_except {
- idents.push(excepts.first_element);
- idents.extend(excepts.additional_elements);
+ idents.push(&excepts.first_element);
+ idents.extend(&excepts.additional_elements);
}
if let Some(exclude) = opt_exclude {
match exclude {
@@ -387,7 +387,7 @@ fn get_exprs_except_skipped(
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
- wildcard_options: Option<WildcardAdditionalOptions>,
+ wildcard_options: Option<&WildcardAdditionalOptions>,
) -> Result<Vec<Expr>> {
let using_columns = plan.using_columns()?;
let mut columns_to_skip = using_columns
@@ -417,7 +417,7 @@ pub fn expand_wildcard(
..
}) = wildcard_options
{
- get_excluded_columns(opt_exclude, opt_except, schema, &None)?
+ get_excluded_columns(opt_exclude.as_ref(), opt_except.as_ref(),
schema, &None)?
} else {
vec![]
};
@@ -430,7 +430,7 @@ pub fn expand_wildcard(
pub fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
- wildcard_options: Option<WildcardAdditionalOptions>,
+ wildcard_options: Option<&WildcardAdditionalOptions>,
) -> Result<Vec<Expr>> {
let qualifier = TableReference::from(qualifier);
let qualified_fields: Vec<DFField> = schema
@@ -451,7 +451,12 @@ pub fn expand_qualified_wildcard(
..
}) = wildcard_options
{
- get_excluded_columns(opt_exclude, opt_except, schema,
&Some(qualifier))?
+ get_excluded_columns(
+ opt_exclude.as_ref(),
+ opt_except.as_ref(),
+ schema,
+ &Some(qualifier),
+ )?
} else {
vec![]
};
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index e25fdc863b..23605d0377 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -24,6 +24,7 @@ use crate::utils::{
resolve_columns, resolve_positions_to_exprs,
};
+use datafusion_common::Column;
use datafusion_common::{
get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef,
DataFusionError, Result,
@@ -40,7 +41,9 @@ use datafusion_expr::utils::{
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
-use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions,
WindowType};
+use sqlparser::ast::{
+ Distinct, Expr as SQLExpr, ReplaceSelectItem, WildcardAdditionalOptions,
WindowType,
+};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem,
TableWithJoins};
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
@@ -359,17 +362,44 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return plan_err!("SELECT * with no tables specified is not
valid");
}
// do not expand from outer schema
- expand_wildcard(plan.schema().as_ref(), plan, Some(options))
+ let expanded_exprs =
+ expand_wildcard(plan.schema().as_ref(), plan,
Some(&options))?;
+ // If there is a REPLACE statement, replace that column with
the given
+ // replace expression. Column name remains the same.
+ if let Some(replace) = options.opt_replace {
+ self.replace_columns(
+ plan,
+ empty_from,
+ planner_context,
+ expanded_exprs,
+ replace,
+ )
+ } else {
+ Ok(expanded_exprs)
+ }
}
SelectItem::QualifiedWildcard(ref object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = format!("{object_name}");
// do not expand from outer schema
- expand_qualified_wildcard(
+ let expanded_exprs = expand_qualified_wildcard(
&qualifier,
plan.schema().as_ref(),
- Some(options),
- )
+ Some(&options),
+ )?;
+ // If there is a REPLACE statement, replace that column with
the given
+ // replace expression. Column name remains the same.
+ if let Some(replace) = options.opt_replace {
+ self.replace_columns(
+ plan,
+ empty_from,
+ planner_context,
+ expanded_exprs,
+ replace,
+ )
+ } else {
+ Ok(expanded_exprs)
+ }
}
}
}
@@ -380,16 +410,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
opt_exclude: _opt_exclude,
opt_except: _opt_except,
opt_rename,
- opt_replace,
+ opt_replace: _opt_replace,
} = options;
- if opt_rename.is_some() || opt_replace.is_some() {
- not_impl_err!("wildcard * with RENAME or REPLACE not supported ")
+ if opt_rename.is_some() {
+ Err(DataFusionError::NotImplemented(
+ "wildcard * with RENAME not supported ".to_string(),
+ ))
} else {
Ok(())
}
}
+ /// If there is a REPLACE statement in the projected expression in the
form of
+ /// "REPLACE (some_column_within_an_expr AS some_column)", this function
replaces
+ /// that column with the given replace expression. Column name remains the
same.
+ /// Multiple REPLACEs are also possible with comma separations.
+ fn replace_columns(
+ &self,
+ plan: &LogicalPlan,
+ empty_from: bool,
+ planner_context: &mut PlannerContext,
+ mut exprs: Vec<Expr>,
+ replace: ReplaceSelectItem,
+ ) -> Result<Vec<Expr>> {
+ for expr in exprs.iter_mut() {
+ if let Expr::Column(Column { name, .. }) = expr {
+ if let Some(item) = replace
+ .items
+ .iter()
+ .find(|item| item.column_name.value == *name)
+ {
+ let new_expr = self.sql_select_to_rex(
+ SelectItem::UnnamedExpr(item.expr.clone()),
+ plan,
+ empty_from,
+ planner_context,
+ )?[0]
+ .clone();
+ *expr = Expr::Alias(Alias {
+ expr: Box::new(new_expr),
+ name: name.clone(),
+ });
+ }
+ }
+ }
+ Ok(exprs)
+ }
+
/// Wrap a plan in a projection
fn project(&self, input: LogicalPlan, expr: Vec<Expr>) ->
Result<LogicalPlan> {
self.validate_schema_satisfies_exprs(input.schema(), &expr)?;
diff --git a/datafusion/sqllogictest/test_files/functions.slt
b/datafusion/sqllogictest/test_files/functions.slt
index fd3a28dfe0..ac9fc942b4 100644
--- a/datafusion/sqllogictest/test_files/functions.slt
+++ b/datafusion/sqllogictest/test_files/functions.slt
@@ -772,3 +772,42 @@ query I
SELECT strpos(arrow_cast('helloworld', 'Dictionary(Int32, Utf8)'), 'world')
----
6
+
+statement ok
+CREATE TABLE products (
+product_id INT PRIMARY KEY,
+product_name VARCHAR(100),
+price DECIMAL(10, 2))
+
+statement ok
+INSERT INTO products (product_id, product_name, price) VALUES
+(1, 'OldBrand Product 1', 19.99),
+(2, 'OldBrand Product 2', 29.99),
+(3, 'OldBrand Product 3', 39.99),
+(4, 'OldBrand Product 4', 49.99)
+
+query ITR
+SELECT * REPLACE (price*2 AS price) FROM products
+----
+1 OldBrand Product 1 39.98
+2 OldBrand Product 2 59.98
+3 OldBrand Product 3 79.98
+4 OldBrand Product 4 99.98
+
+# types are conserved
+query ITR
+SELECT * REPLACE (product_id/2 AS product_id) FROM products
+----
+0 OldBrand Product 1 19.99
+1 OldBrand Product 2 29.99
+1 OldBrand Product 3 39.99
+2 OldBrand Product 4 49.99
+
+# multiple replace statements with qualified wildcard
+query ITR
+SELECT products.* REPLACE (price*2 AS price, product_id+1000 AS product_id)
FROM products
+----
+1001 OldBrand Product 1 39.98
+1002 OldBrand Product 2 59.98
+1003 OldBrand Product 3 79.98
+1004 OldBrand Product 4 99.98
\ No newline at end of file