This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b20846e add support for unary and binary values in values list,
update docs (#1172)
b20846e is described below
commit b20846e56d287cb4f55b83f1642854d4155cdc76
Author: Jiayu Liu <[email protected]>
AuthorDate: Mon Oct 25 19:30:03 2021 +0800
add support for unary and binary values in values list, update docs (#1172)
* add support for unary and binary values in values list
* add explain
* fix clippy
---
README.md | 3 +-
datafusion/src/physical_plan/values.rs | 24 ++++-
datafusion/src/sql/planner.rs | 170 +++++++++++++++++++--------------
datafusion/tests/sql.rs | 38 ++++++++
integration-tests/sqls/values_list.sql | 19 ++++
integration-tests/test_psql_parity.py | 2 +-
6 files changed, 178 insertions(+), 78 deletions(-)
diff --git a/README.md b/README.md
index e06ad6a..ea946f4 100644
--- a/README.md
+++ b/README.md
@@ -161,6 +161,7 @@ DataFusion also includes a simple command-line interactive
SQL utility. See the
- [x] Common math functions
- [x] cast
- [x] try_cast
+- [x] [`VALUES`
lists](https://www.postgresql.org/docs/current/queries-values.html)
- Postgres compatible String functions
- [x] ascii
- [x] bit_length
@@ -193,7 +194,7 @@ DataFusion also includes a simple command-line interactive
SQL utility. See the
- Miscellaneous/Boolean functions
- [x] nullif
- Approximation functions
- - [ ] approx_distinct
+ - [x] approx_distinct
- Common date/time functions
- [ ] Basic date functions
- [ ] Basic time functions
diff --git a/datafusion/src/physical_plan/values.rs
b/datafusion/src/physical_plan/values.rs
index 6658f67..de15d40 100644
--- a/datafusion/src/physical_plan/values.rs
+++ b/datafusion/src/physical_plan/values.rs
@@ -24,6 +24,7 @@ use crate::physical_plan::{
Partitioning, PhysicalExpr,
};
use crate::scalar::ScalarValue;
+use arrow::array::new_null_array;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
@@ -48,10 +49,17 @@ impl ValuesExec {
if data.is_empty() {
return Err(DataFusionError::Plan("Values list cannot be
empty".into()));
}
- // we have this empty batch as a placeholder to satisfy evaluation
argument
- let batch = RecordBatch::new_empty(schema.clone());
let n_row = data.len();
let n_col = schema.fields().len();
+ // we have this single row, null, typed batch as a placeholder to
satisfy evaluation argument
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ schema
+ .fields()
+ .iter()
+ .map(|field| new_null_array(field.data_type(), 1))
+ .collect::<Vec<_>>(),
+ )?;
let arr = (0..n_col)
.map(|j| {
(0..n_row)
@@ -59,9 +67,15 @@ impl ValuesExec {
let r = data[i][j].evaluate(&batch);
match r {
Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
- Ok(ColumnarValue::Array(_)) =>
Err(DataFusionError::Plan(
- "Cannot have array values in a values
list".into(),
- )),
+ Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
+ ScalarValue::try_from_array(&a, 0)
+ }
+ Ok(ColumnarValue::Array(a)) => {
+ Err(DataFusionError::Plan(format!(
+ "Cannot have array values {:?} in a values
list",
+ a
+ )))
+ }
Err(err) => Err(err),
}
})
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index a16c40a..73fb681 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -1069,17 +1069,92 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}
+ fn parse_sql_binary_op(
+ &self,
+ left: &SQLExpr,
+ op: &BinaryOperator,
+ right: &SQLExpr,
+ schema: &DFSchema,
+ ) -> Result<Expr> {
+ let operator = match *op {
+ BinaryOperator::Gt => Ok(Operator::Gt),
+ BinaryOperator::GtEq => Ok(Operator::GtEq),
+ BinaryOperator::Lt => Ok(Operator::Lt),
+ BinaryOperator::LtEq => Ok(Operator::LtEq),
+ BinaryOperator::Eq => Ok(Operator::Eq),
+ BinaryOperator::NotEq => Ok(Operator::NotEq),
+ BinaryOperator::Plus => Ok(Operator::Plus),
+ BinaryOperator::Minus => Ok(Operator::Minus),
+ BinaryOperator::Multiply => Ok(Operator::Multiply),
+ BinaryOperator::Divide => Ok(Operator::Divide),
+ BinaryOperator::Modulo => Ok(Operator::Modulo),
+ BinaryOperator::And => Ok(Operator::And),
+ BinaryOperator::Or => Ok(Operator::Or),
+ BinaryOperator::Like => Ok(Operator::Like),
+ BinaryOperator::NotLike => Ok(Operator::NotLike),
+ BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
+ BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
+ BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
+ BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported SQL binary operator {:?}",
+ op
+ ))),
+ }?;
+
+ Ok(Expr::BinaryExpr {
+ left: Box::new(self.sql_expr_to_logical_expr(left, schema)?),
+ op: operator,
+ right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
+ })
+ }
+
+ fn parse_sql_unary_op(
+ &self,
+ op: &UnaryOperator,
+ expr: &SQLExpr,
+ schema: &DFSchema,
+ ) -> Result<Expr> {
+ match op {
+ UnaryOperator::Not => Ok(Expr::Not(Box::new(
+ self.sql_expr_to_logical_expr(expr, schema)?,
+ ))),
+ UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr,
schema)?),
+ UnaryOperator::Minus => {
+ match expr {
+ // optimization: if it's a number literal, we apply the
negative operator
+ // here directly to calculate the new literal.
+ SQLExpr::Value(Value::Number(n,_)) => match
n.parse::<i64>() {
+ Ok(n) => Ok(lit(-n)),
+ Err(_) => Ok(lit(-n
+ .parse::<f64>()
+ .map_err(|_e| {
+ DataFusionError::Internal(format!(
+ "negative operator can be only applied to
integer and float operands, got: {}",
+ n))
+ })?)),
+ },
+ // not a literal, apply negative operator on expression
+ _ =>
Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))),
+ }
+ }
+ _ => Err(DataFusionError::NotImplemented(format!(
+ "Unsupported SQL unary operator {:?}",
+ op
+ ))),
+ }
+ }
+
fn sql_values_to_plan(&self, values: &SQLValues) -> Result<LogicalPlan> {
+ // values should not be based on any other schema
+ let schema = DFSchema::empty();
let values = values
.0
.iter()
.map(|row| {
row.iter()
.map(|v| match v {
- SQLExpr::Value(Value::Number(n, _)) => match
n.parse::<i64>() {
- Ok(n) => Ok(lit(n)),
- Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
- },
+ SQLExpr::Value(Value::Number(n, _)) =>
parse_sql_number(n),
SQLExpr::Value(Value::SingleQuotedString(ref s)) => {
Ok(lit(s.clone()))
}
@@ -1087,6 +1162,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(Expr::Literal(ScalarValue::Utf8(None)))
}
SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
+ SQLExpr::UnaryOp { ref op, ref expr } => {
+ self.parse_sql_unary_op(op, expr, &schema)
+ }
+ SQLExpr::BinaryOp {
+ ref left,
+ ref op,
+ ref right,
+ } => self.parse_sql_binary_op(left, op, right,
&schema),
other => Err(DataFusionError::NotImplemented(format!(
"Unsupported value {:?} in a values list
expression",
other
@@ -1100,14 +1183,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) ->
Result<Expr> {
match sql {
- SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
- Ok(n) => Ok(lit(n)),
- Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
- },
+ SQLExpr::Value(Value::Number(n, _)) => parse_sql_number(n),
SQLExpr::Value(Value::SingleQuotedString(ref s)) =>
Ok(lit(s.clone())),
-
SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
-
SQLExpr::Value(Value::Null) =>
Ok(Expr::Literal(ScalarValue::Utf8(None))),
SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::DatePart,
@@ -1244,34 +1322,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
}),
- SQLExpr::UnaryOp { ref op, ref expr } => match op {
- UnaryOperator::Not => Ok(Expr::Not(Box::new(
- self.sql_expr_to_logical_expr(expr, schema)?,
- ))),
- UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr,
schema)?),
- UnaryOperator::Minus => {
- match expr.as_ref() {
- // optimization: if it's a number literal, we apply
the negative operator
- // here directly to calculate the new literal.
- SQLExpr::Value(Value::Number(n,_)) => match
n.parse::<i64>() {
- Ok(n) => Ok(lit(-n)),
- Err(_) => Ok(lit(-n
- .parse::<f64>()
- .map_err(|_e| {
- DataFusionError::Internal(format!(
- "negative operator can be only applied
to integer and float operands, got: {}",
- n))
- })?)),
- },
- // not a literal, apply negative operator on expression
- _ =>
Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))),
- }
- }
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported SQL unary operator {:?}",
- op
- ))),
- },
+ SQLExpr::UnaryOp { ref op, ref expr } => {
+ self.parse_sql_unary_op(op, expr, schema)
+ }
SQLExpr::Between {
ref expr,
@@ -1306,39 +1359,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
ref left,
ref op,
ref right,
- } => {
- let operator = match *op {
- BinaryOperator::Gt => Ok(Operator::Gt),
- BinaryOperator::GtEq => Ok(Operator::GtEq),
- BinaryOperator::Lt => Ok(Operator::Lt),
- BinaryOperator::LtEq => Ok(Operator::LtEq),
- BinaryOperator::Eq => Ok(Operator::Eq),
- BinaryOperator::NotEq => Ok(Operator::NotEq),
- BinaryOperator::Plus => Ok(Operator::Plus),
- BinaryOperator::Minus => Ok(Operator::Minus),
- BinaryOperator::Multiply => Ok(Operator::Multiply),
- BinaryOperator::Divide => Ok(Operator::Divide),
- BinaryOperator::Modulo => Ok(Operator::Modulo),
- BinaryOperator::And => Ok(Operator::And),
- BinaryOperator::Or => Ok(Operator::Or),
- BinaryOperator::Like => Ok(Operator::Like),
- BinaryOperator::NotLike => Ok(Operator::NotLike),
- BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
- BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
- BinaryOperator::PGRegexNotMatch =>
Ok(Operator::RegexNotMatch),
- BinaryOperator::PGRegexNotIMatch =>
Ok(Operator::RegexNotIMatch),
- _ => Err(DataFusionError::NotImplemented(format!(
- "Unsupported SQL binary operator {:?}",
- op
- ))),
- }?;
-
- Ok(Expr::BinaryExpr {
- left: Box::new(self.sql_expr_to_logical_expr(left,
schema)?),
- op: operator,
- right: Box::new(self.sql_expr_to_logical_expr(right,
schema)?),
- })
- }
+ } => self.parse_sql_binary_op(left, op, right, schema),
SQLExpr::Trim { expr, trim_where } => {
let (fun, where_expr) = match trim_where {
@@ -3630,3 +3651,10 @@ mod tests {
quick_test(sql, expected);
}
}
+
+fn parse_sql_number(n: &str) -> Result<Expr> {
+ match n.parse::<i64>() {
+ Ok(n) => Ok(lit(n)),
+ Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
+ }
+}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 4d299ec..34b6b6c 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -492,6 +492,30 @@ async fn select_values_list() -> Result<()> {
assert_batches_eq!(expected, &actual);
}
{
+ let sql = "VALUES (-1)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| -1 |",
+ "+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
+ let sql = "VALUES (2+1,2-1,2>1)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+ "+---------+---------+---------+",
+ "| column1 | column2 | column3 |",
+ "+---------+---------+---------+",
+ "| 3 | 1 | true |",
+ "+---------+---------+---------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+ {
let sql = "VALUES";
let plan = ctx.create_logical_plan(sql);
assert!(plan.is_err());
@@ -647,6 +671,20 @@ async fn select_values_list() -> Result<()> {
];
assert_batches_eq!(expected, &actual);
}
+ {
+ let sql = "EXPLAIN VALUES (1, 'a', -1, 1.1),(NULL, 'b', -3, 0.5)";
+ let actual = execute_to_batches(&mut ctx, sql).await;
+ let expected = vec![
+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
+ "| plan_type | plan
|",
+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
+ "| logical_plan | Values: (Int64(1), Utf8(\"a\"), Int64(-1),
Float64(1.1)), (Int64(NULL), Utf8(\"b\"), Int64(-3), Float64(0.5)) |",
+ "| physical_plan | ValuesExec
|",
+ "| |
|",
+
"+---------------+-----------------------------------------------------------------------------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
Ok(())
}
diff --git a/integration-tests/sqls/values_list.sql
b/integration-tests/sqls/values_list.sql
new file mode 100644
index 0000000..b94e59a
--- /dev/null
+++ b/integration-tests/sqls/values_list.sql
@@ -0,0 +1,19 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SELECT * FROM
+(VALUES (1,2.0,-3,1+1),(10,20.0,-30,2+2))
+AS tbl(int_col, float_col, negative_col, summation);
diff --git a/integration-tests/test_psql_parity.py
b/integration-tests/test_psql_parity.py
index a85a2c2..e9776e0 100644
--- a/integration-tests/test_psql_parity.py
+++ b/integration-tests/test_psql_parity.py
@@ -77,7 +77,7 @@ test_files = set(root.glob("*.sql"))
class TestPsqlParity:
def test_tests_count(self):
- assert len(test_files) == 15, "tests are missed"
+ assert len(test_files) == 16, "tests are missed"
@pytest.mark.parametrize("fname", test_files)
def test_sql_file(self, fname):