This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b20846e  add support for unary and binary values in values list, 
update docs (#1172)
b20846e is described below

commit b20846e56d287cb4f55b83f1642854d4155cdc76
Author: Jiayu Liu <[email protected]>
AuthorDate: Mon Oct 25 19:30:03 2021 +0800

    add support for unary and binary values in values list, update docs (#1172)
    
    * add support for unary and binary values in values list
    
    * add explain
    
    * fix clippy
---
 README.md                              |   3 +-
 datafusion/src/physical_plan/values.rs |  24 ++++-
 datafusion/src/sql/planner.rs          | 170 +++++++++++++++++++--------------
 datafusion/tests/sql.rs                |  38 ++++++++
 integration-tests/sqls/values_list.sql |  19 ++++
 integration-tests/test_psql_parity.py  |   2 +-
 6 files changed, 178 insertions(+), 78 deletions(-)

diff --git a/README.md b/README.md
index e06ad6a..ea946f4 100644
--- a/README.md
+++ b/README.md
@@ -161,6 +161,7 @@ DataFusion also includes a simple command-line interactive 
SQL utility. See the
 - [x] Common math functions
 - [x] cast
 - [x] try_cast
+- [x] [`VALUES` 
lists](https://www.postgresql.org/docs/current/queries-values.html)
 - Postgres compatible String functions
   - [x] ascii
   - [x] bit_length
@@ -193,7 +194,7 @@ DataFusion also includes a simple command-line interactive 
SQL utility. See the
 - Miscellaneous/Boolean functions
   - [x] nullif
 - Approximation functions
-  - [ ] approx_distinct
+  - [x] approx_distinct
 - Common date/time functions
   - [ ] Basic date functions
   - [ ] Basic time functions
diff --git a/datafusion/src/physical_plan/values.rs 
b/datafusion/src/physical_plan/values.rs
index 6658f67..de15d40 100644
--- a/datafusion/src/physical_plan/values.rs
+++ b/datafusion/src/physical_plan/values.rs
@@ -24,6 +24,7 @@ use crate::physical_plan::{
     Partitioning, PhysicalExpr,
 };
 use crate::scalar::ScalarValue;
+use arrow::array::new_null_array;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
@@ -48,10 +49,17 @@ impl ValuesExec {
         if data.is_empty() {
             return Err(DataFusionError::Plan("Values list cannot be 
empty".into()));
         }
-        // we have this empty batch as a placeholder to satisfy evaluation 
argument
-        let batch = RecordBatch::new_empty(schema.clone());
         let n_row = data.len();
         let n_col = schema.fields().len();
+        // we have this single row, null, typed batch as a placeholder to 
satisfy evaluation argument
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            schema
+                .fields()
+                .iter()
+                .map(|field| new_null_array(field.data_type(), 1))
+                .collect::<Vec<_>>(),
+        )?;
         let arr = (0..n_col)
             .map(|j| {
                 (0..n_row)
@@ -59,9 +67,15 @@ impl ValuesExec {
                         let r = data[i][j].evaluate(&batch);
                         match r {
                             Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar),
-                            Ok(ColumnarValue::Array(_)) => 
Err(DataFusionError::Plan(
-                                "Cannot have array values in a values 
list".into(),
-                            )),
+                            Ok(ColumnarValue::Array(a)) if a.len() == 1 => {
+                                ScalarValue::try_from_array(&a, 0)
+                            }
+                            Ok(ColumnarValue::Array(a)) => {
+                                Err(DataFusionError::Plan(format!(
+                                    "Cannot have array values {:?} in a values 
list",
+                                    a
+                                )))
+                            }
                             Err(err) => Err(err),
                         }
                     })
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index a16c40a..73fb681 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -1069,17 +1069,92 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         }
     }
 
+    fn parse_sql_binary_op(
+        &self,
+        left: &SQLExpr,
+        op: &BinaryOperator,
+        right: &SQLExpr,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        let operator = match *op {
+            BinaryOperator::Gt => Ok(Operator::Gt),
+            BinaryOperator::GtEq => Ok(Operator::GtEq),
+            BinaryOperator::Lt => Ok(Operator::Lt),
+            BinaryOperator::LtEq => Ok(Operator::LtEq),
+            BinaryOperator::Eq => Ok(Operator::Eq),
+            BinaryOperator::NotEq => Ok(Operator::NotEq),
+            BinaryOperator::Plus => Ok(Operator::Plus),
+            BinaryOperator::Minus => Ok(Operator::Minus),
+            BinaryOperator::Multiply => Ok(Operator::Multiply),
+            BinaryOperator::Divide => Ok(Operator::Divide),
+            BinaryOperator::Modulo => Ok(Operator::Modulo),
+            BinaryOperator::And => Ok(Operator::And),
+            BinaryOperator::Or => Ok(Operator::Or),
+            BinaryOperator::Like => Ok(Operator::Like),
+            BinaryOperator::NotLike => Ok(Operator::NotLike),
+            BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
+            BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
+            BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
+            BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unsupported SQL binary operator {:?}",
+                op
+            ))),
+        }?;
+
+        Ok(Expr::BinaryExpr {
+            left: Box::new(self.sql_expr_to_logical_expr(left, schema)?),
+            op: operator,
+            right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
+        })
+    }
+
+    fn parse_sql_unary_op(
+        &self,
+        op: &UnaryOperator,
+        expr: &SQLExpr,
+        schema: &DFSchema,
+    ) -> Result<Expr> {
+        match op {
+            UnaryOperator::Not => Ok(Expr::Not(Box::new(
+                self.sql_expr_to_logical_expr(expr, schema)?,
+            ))),
+            UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr, 
schema)?),
+            UnaryOperator::Minus => {
+                match expr {
+                    // optimization: if it's a number literal, we apply the 
negative operator
+                    // here directly to calculate the new literal.
+                    SQLExpr::Value(Value::Number(n,_)) => match 
n.parse::<i64>() {
+                        Ok(n) => Ok(lit(-n)),
+                        Err(_) => Ok(lit(-n
+                            .parse::<f64>()
+                            .map_err(|_e| {
+                                DataFusionError::Internal(format!(
+                                    "negative operator can be only applied to 
integer and float operands, got: {}",
+                                n))
+                            })?)),
+                    },
+                    // not a literal, apply negative operator on expression
+                    _ => 
Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))),
+                }
+            }
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "Unsupported SQL unary operator {:?}",
+                op
+            ))),
+        }
+    }
+
     fn sql_values_to_plan(&self, values: &SQLValues) -> Result<LogicalPlan> {
+        // values should not be based on any other schema
+        let schema = DFSchema::empty();
         let values = values
             .0
             .iter()
             .map(|row| {
                 row.iter()
                     .map(|v| match v {
-                        SQLExpr::Value(Value::Number(n, _)) => match 
n.parse::<i64>() {
-                            Ok(n) => Ok(lit(n)),
-                            Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
-                        },
+                        SQLExpr::Value(Value::Number(n, _)) => 
parse_sql_number(n),
                         SQLExpr::Value(Value::SingleQuotedString(ref s)) => {
                             Ok(lit(s.clone()))
                         }
@@ -1087,6 +1162,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             Ok(Expr::Literal(ScalarValue::Utf8(None)))
                         }
                         SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
+                        SQLExpr::UnaryOp { ref op, ref expr } => {
+                            self.parse_sql_unary_op(op, expr, &schema)
+                        }
+                        SQLExpr::BinaryOp {
+                            ref left,
+                            ref op,
+                            ref right,
+                        } => self.parse_sql_binary_op(left, op, right, 
&schema),
                         other => Err(DataFusionError::NotImplemented(format!(
                             "Unsupported value {:?} in a values list 
expression",
                             other
@@ -1100,14 +1183,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
     fn sql_expr_to_logical_expr(&self, sql: &SQLExpr, schema: &DFSchema) -> 
Result<Expr> {
         match sql {
-            SQLExpr::Value(Value::Number(n, _)) => match n.parse::<i64>() {
-                Ok(n) => Ok(lit(n)),
-                Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
-            },
+            SQLExpr::Value(Value::Number(n, _)) => parse_sql_number(n),
             SQLExpr::Value(Value::SingleQuotedString(ref s)) => 
Ok(lit(s.clone())),
-
             SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),
-
             SQLExpr::Value(Value::Null) => 
Ok(Expr::Literal(ScalarValue::Utf8(None))),
             SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
                 fun: functions::BuiltinScalarFunction::DatePart,
@@ -1244,34 +1322,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 right: Box::new(self.sql_expr_to_logical_expr(right, schema)?),
             }),
 
-            SQLExpr::UnaryOp { ref op, ref expr } => match op {
-                UnaryOperator::Not => Ok(Expr::Not(Box::new(
-                    self.sql_expr_to_logical_expr(expr, schema)?,
-                ))),
-                UnaryOperator::Plus => Ok(self.sql_expr_to_logical_expr(expr, 
schema)?),
-                UnaryOperator::Minus => {
-                    match expr.as_ref() {
-                        // optimization: if it's a number literal, we apply 
the negative operator
-                        // here directly to calculate the new literal.
-                        SQLExpr::Value(Value::Number(n,_)) => match 
n.parse::<i64>() {
-                            Ok(n) => Ok(lit(-n)),
-                            Err(_) => Ok(lit(-n
-                                .parse::<f64>()
-                                .map_err(|_e| {
-                                    DataFusionError::Internal(format!(
-                                        "negative operator can be only applied 
to integer and float operands, got: {}",
-                                    n))
-                                })?)),
-                        },
-                        // not a literal, apply negative operator on expression
-                        _ => 
Ok(Expr::Negative(Box::new(self.sql_expr_to_logical_expr(expr, schema)?))),
-                    }
-                }
-                _ => Err(DataFusionError::NotImplemented(format!(
-                    "Unsupported SQL unary operator {:?}",
-                    op
-                ))),
-            },
+            SQLExpr::UnaryOp { ref op, ref expr } => {
+                self.parse_sql_unary_op(op, expr, schema)
+            }
 
             SQLExpr::Between {
                 ref expr,
@@ -1306,39 +1359,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 ref left,
                 ref op,
                 ref right,
-            } => {
-                let operator = match *op {
-                    BinaryOperator::Gt => Ok(Operator::Gt),
-                    BinaryOperator::GtEq => Ok(Operator::GtEq),
-                    BinaryOperator::Lt => Ok(Operator::Lt),
-                    BinaryOperator::LtEq => Ok(Operator::LtEq),
-                    BinaryOperator::Eq => Ok(Operator::Eq),
-                    BinaryOperator::NotEq => Ok(Operator::NotEq),
-                    BinaryOperator::Plus => Ok(Operator::Plus),
-                    BinaryOperator::Minus => Ok(Operator::Minus),
-                    BinaryOperator::Multiply => Ok(Operator::Multiply),
-                    BinaryOperator::Divide => Ok(Operator::Divide),
-                    BinaryOperator::Modulo => Ok(Operator::Modulo),
-                    BinaryOperator::And => Ok(Operator::And),
-                    BinaryOperator::Or => Ok(Operator::Or),
-                    BinaryOperator::Like => Ok(Operator::Like),
-                    BinaryOperator::NotLike => Ok(Operator::NotLike),
-                    BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
-                    BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
-                    BinaryOperator::PGRegexNotMatch => 
Ok(Operator::RegexNotMatch),
-                    BinaryOperator::PGRegexNotIMatch => 
Ok(Operator::RegexNotIMatch),
-                    _ => Err(DataFusionError::NotImplemented(format!(
-                        "Unsupported SQL binary operator {:?}",
-                        op
-                    ))),
-                }?;
-
-                Ok(Expr::BinaryExpr {
-                    left: Box::new(self.sql_expr_to_logical_expr(left, 
schema)?),
-                    op: operator,
-                    right: Box::new(self.sql_expr_to_logical_expr(right, 
schema)?),
-                })
-            }
+            } => self.parse_sql_binary_op(left, op, right, schema),
 
             SQLExpr::Trim { expr, trim_where } => {
                 let (fun, where_expr) = match trim_where {
@@ -3630,3 +3651,10 @@ mod tests {
         quick_test(sql, expected);
     }
 }
+
+fn parse_sql_number(n: &str) -> Result<Expr> {
+    match n.parse::<i64>() {
+        Ok(n) => Ok(lit(n)),
+        Err(_) => Ok(lit(n.parse::<f64>().unwrap())),
+    }
+}
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 4d299ec..34b6b6c 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -492,6 +492,30 @@ async fn select_values_list() -> Result<()> {
         assert_batches_eq!(expected, &actual);
     }
     {
+        let sql = "VALUES (-1)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+",
+            "| column1 |",
+            "+---------+",
+            "| -1      |",
+            "+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
+        let sql = "VALUES (2+1,2-1,2>1)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            "+---------+---------+---------+",
+            "| column1 | column2 | column3 |",
+            "+---------+---------+---------+",
+            "| 3       | 1       | true    |",
+            "+---------+---------+---------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+    {
         let sql = "VALUES";
         let plan = ctx.create_logical_plan(sql);
         assert!(plan.is_err());
@@ -647,6 +671,20 @@ async fn select_values_list() -> Result<()> {
         ];
         assert_batches_eq!(expected, &actual);
     }
+    {
+        let sql = "EXPLAIN VALUES (1, 'a', -1, 1.1),(NULL, 'b', -3, 0.5)";
+        let actual = execute_to_batches(&mut ctx, sql).await;
+        let expected = vec![
+            
"+---------------+-----------------------------------------------------------------------------------------------------------+",
+            "| plan_type     | plan                                            
                                                          |",
+            
"+---------------+-----------------------------------------------------------------------------------------------------------+",
+            "| logical_plan  | Values: (Int64(1), Utf8(\"a\"), Int64(-1), 
Float64(1.1)), (Int64(NULL), Utf8(\"b\"), Int64(-3), Float64(0.5)) |",
+            "| physical_plan | ValuesExec                                      
                                                          |",
+            "|               |                                                 
                                                          |",
+            
"+---------------+-----------------------------------------------------------------------------------------------------------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
     Ok(())
 }
 
diff --git a/integration-tests/sqls/values_list.sql 
b/integration-tests/sqls/values_list.sql
new file mode 100644
index 0000000..b94e59a
--- /dev/null
+++ b/integration-tests/sqls/values_list.sql
@@ -0,0 +1,19 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+SELECT * FROM
+(VALUES (1,2.0,-3,1+1),(10,20.0,-30,2+2))
+AS tbl(int_col, float_col, negative_col, summation);
diff --git a/integration-tests/test_psql_parity.py 
b/integration-tests/test_psql_parity.py
index a85a2c2..e9776e0 100644
--- a/integration-tests/test_psql_parity.py
+++ b/integration-tests/test_psql_parity.py
@@ -77,7 +77,7 @@ test_files = set(root.glob("*.sql"))
 
 class TestPsqlParity:
     def test_tests_count(self):
-        assert len(test_files) == 15, "tests are missed"
+        assert len(test_files) == 16, "tests are missed"
 
     @pytest.mark.parametrize("fname", test_files)
     def test_sql_file(self, fname):

Reply via email to